diff --git a/Cargo.toml b/Cargo.toml index 94c2f609b..f5055122f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,8 @@ bytes = "0.4" futures = "0.1" tokio-io = "0.1" tokio-core = "0.1" + +h2 = { path = '../h2' } # h2 = { git = 'https://github.com/carllerche/h2', optional = true } # tls diff --git a/src/decode.rs b/src/decode.rs deleted file mode 100644 index e8115f47f..000000000 --- a/src/decode.rs +++ /dev/null @@ -1,271 +0,0 @@ -#![allow(dead_code)] - -use std::{io, usize}; - -use futures::{Async, Poll}; -use bytes::{Bytes, BytesMut}; - -use self::Kind::{Length, Chunked, Eof}; - -/// Decoders to handle different Transfer-Encodings. -/// -/// If a message body does not include a Transfer-Encoding, it *should* -/// include a Content-Length header. -#[derive(Debug, Clone, PartialEq)] -pub struct Decoder { - kind: Kind, -} - -impl Decoder { - pub fn length(x: u64) -> Decoder { - Decoder { kind: Kind::Length(x) } - } - - pub fn chunked() -> Decoder { - Decoder { kind: Kind::Chunked(ChunkedState::Size, 0) } - } - - pub fn eof() -> Decoder { - Decoder { kind: Kind::Eof(false) } - } -} - -#[derive(Debug, Clone, PartialEq)] -enum Kind { - /// A Reader used when a Content-Length header is passed with a positive integer. - Length(u64), - /// A Reader used when Transfer-Encoding is `chunked`. - Chunked(ChunkedState, u64), - /// A Reader used for responses that don't indicate a length or chunked. - /// - /// Note: This should only used for `Response`s. It is illegal for a - /// `Request` to be made with both `Content-Length` and - /// `Transfer-Encoding: chunked` missing, as explained from the spec: - /// - /// > If a Transfer-Encoding header field is present in a response and - /// > the chunked transfer coding is not the final encoding, the - /// > message body length is determined by reading the connection until - /// > it is closed by the server. If a Transfer-Encoding header field - /// > is present in a request and the chunked transfer coding is not - /// > the final encoding, the message body length cannot be determined - /// > reliably; the server MUST respond with the 400 (Bad Request) - /// > status code and then close the connection. - Eof(bool), -} - -#[derive(Debug, PartialEq, Clone)] -enum ChunkedState { - Size, - SizeLws, - Extension, - SizeLf, - Body, - BodyCr, - BodyLf, - EndCr, - EndLf, - End, -} - -impl Decoder { - pub fn is_eof(&self) -> bool { - trace!("is_eof? {:?}", self); - match self.kind { - Length(0) | - Chunked(ChunkedState::End, _) | - Eof(true) => true, - _ => false, - } - } -} - -impl Decoder { - pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { - match self.kind { - Length(ref mut remaining) => { - trace!("Sized read, remaining={:?}", remaining); - if *remaining == 0 { - Ok(Async::Ready(None)) - } else { - if body.is_empty() { - return Ok(Async::NotReady) - } - let len = body.len() as u64; - let buf; - if *remaining > len { - buf = body.take().freeze(); - *remaining -= len; - } else { - buf = body.split_to(*remaining as usize).freeze(); - *remaining = 0; - } - trace!("Length read: {}", buf.len()); - Ok(Async::Ready(Some(buf))) - } - } - Chunked(ref mut state, ref mut size) => { - loop { - let mut buf = None; - // advances the chunked state - *state = try_ready!(state.step(body, size, &mut buf)); - if *state == ChunkedState::End { - trace!("End of chunked stream"); - return Ok(Async::Ready(None)); - } - if let Some(buf) = buf { - return Ok(Async::Ready(Some(buf))); - } - if body.is_empty() { - return Ok(Async::NotReady); - } - } - } - Eof(ref mut is_eof) => { - if *is_eof { - Ok(Async::Ready(None)) - } else if !body.is_empty() { - Ok(Async::Ready(Some(body.take().freeze()))) - } else { - Ok(Async::NotReady) - } - } - } - } -} - -macro_rules! byte ( - ($rdr:ident) => ({ - if $rdr.len() > 0 { - let b = $rdr[0]; - $rdr.split_to(1); - b - } else { - return Ok(Async::NotReady) - } - }) -); - -impl ChunkedState { - fn step(&self, body: &mut BytesMut, size: &mut u64, buf: &mut Option) - -> Poll - { - use self::ChunkedState::*; - match *self { - Size => ChunkedState::read_size(body, size), - SizeLws => ChunkedState::read_size_lws(body), - Extension => ChunkedState::read_extension(body), - SizeLf => ChunkedState::read_size_lf(body, size), - Body => ChunkedState::read_body(body, size, buf), - BodyCr => ChunkedState::read_body_cr(body), - BodyLf => ChunkedState::read_body_lf(body), - EndCr => ChunkedState::read_end_cr(body), - EndLf => ChunkedState::read_end_lf(body), - End => Ok(Async::Ready(ChunkedState::End)), - } - } - fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll { - trace!("Read chunk hex size"); - let radix = 16; - match byte!(rdr) { - b @ b'0'...b'9' => { - *size *= radix; - *size += u64::from(b - b'0'); - } - b @ b'a'...b'f' => { - *size *= radix; - *size += u64::from(b + 10 - b'a'); - } - b @ b'A'...b'F' => { - *size *= radix; - *size += u64::from(b + 10 - b'A'); - } - b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)), - b';' => return Ok(Async::Ready(ChunkedState::Extension)), - b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)), - _ => { - return Err(io::Error::new(io::ErrorKind::InvalidInput, - "Invalid chunk size line: Invalid Size")); - } - } - Ok(Async::Ready(ChunkedState::Size)) - } - fn read_size_lws(rdr: &mut BytesMut) -> Poll { - trace!("read_size_lws"); - match byte!(rdr) { - // LWS can follow the chunk size, but no more digits can come - b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)), - b';' => Ok(Async::Ready(ChunkedState::Extension)), - b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), - _ => { - Err(io::Error::new(io::ErrorKind::InvalidInput, - "Invalid chunk size linear white space")) - } - } - } - fn read_extension(rdr: &mut BytesMut) -> Poll { - trace!("read_extension"); - match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), - _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions - } - } - fn read_size_lf(rdr: &mut BytesMut, size: &mut u64) -> Poll { - trace!("Chunk size is {:?}", size); - match byte!(rdr) { - b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), - b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), - _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk size LF")), - } - } - - fn read_body(rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option) - -> Poll - { - trace!("Chunked read, remaining={:?}", rem); - - let len = rdr.len() as u64; - if len == 0 { - Ok(Async::Ready(ChunkedState::Body)) - } else { - let slice; - if *rem > len { - slice = rdr.take().freeze(); - *rem -= len; - } else { - slice = rdr.split_to(*rem as usize).freeze(); - *rem = 0; - } - *buf = Some(slice); - if *rem > 0 { - Ok(Async::Ready(ChunkedState::Body)) - } else { - Ok(Async::Ready(ChunkedState::BodyCr)) - } - } - } - - fn read_body_cr(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)), - _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body CR")), - } - } - fn read_body_lf(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\n' => Ok(Async::Ready(ChunkedState::Size)), - _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body LF")), - } - } - fn read_end_cr(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::EndLf)), - _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end CR")), - } - } - fn read_end_lf(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\n' => Ok(Async::Ready(ChunkedState::End)), - _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end LF")), - } - } -} diff --git a/src/reader.rs b/src/h1.rs similarity index 74% rename from src/reader.rs rename to src/h1.rs index 8f0d2e83b..ba7b23061 100644 --- a/src/reader.rs +++ b/src/h1.rs @@ -3,19 +3,19 @@ use std::{self, io, ptr}; use httparse; use http::{Method, Version, HttpTryFrom, HeaderMap}; use http::header::{self, HeaderName, HeaderValue}; -use bytes::{BytesMut, BufMut}; +use bytes::{Bytes, BytesMut, BufMut}; use futures::{Async, Poll}; use tokio_io::AsyncRead; use percent_encoding; use error::ParseError; -use decode::Decoder; use httprequest::HttpRequest; use payload::{Payload, PayloadError, PayloadSender}; const MAX_HEADERS: usize = 100; const INIT_BUFFER_SIZE: usize = 8192; const MAX_BUFFER_SIZE: usize = 131_072; +const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; enum Decoding { Paused, @@ -33,12 +33,25 @@ pub(crate) struct Reader { payload: Option, } +#[derive(Debug)] +pub(crate) enum ReaderItem { + Http1(HttpRequest, Payload), + Http2, +} + #[derive(Debug)] pub(crate) enum ReaderError { Payload, Error(ParseError), } +#[derive(Debug)] +enum Message { + Http1(HttpRequest, Option), + Http2, + NotReady, +} + impl Reader { pub fn new() -> Reader { Reader { @@ -110,8 +123,8 @@ impl Reader { } loop { - match try!(Reader::parse_message(&mut self.read_buf).map_err(ReaderError::Error)) { - Some((msg, decoder)) => { + match Reader::parse_message(&mut self.read_buf).map_err(ReaderError::Error)? { + Message::Http1(msg, decoder) => { let payload = if let Some(decoder) = decoder { let (tx, rx) = Payload::new(false); let payload = PayloadInfo { @@ -160,7 +173,9 @@ impl Reader { }; return Ok(Async::Ready((msg, payload))); }, - None => { + Message::Http2 => { + }, + Message::NotReady => { if self.read_buf.capacity() >= MAX_BUFFER_SIZE { debug!("MAX_BUFFER_SIZE reached, closing"); return Err(ReaderError::Error(ParseError::TooLarge)); @@ -205,11 +220,14 @@ impl Reader { } } - fn parse_message(buf: &mut BytesMut) - -> Result)>, ParseError> + fn parse_message(buf: &mut BytesMut) -> Result { - if buf.is_empty() { - return Ok(None); + println!("BUF: {:?}", buf); + if buf.is_empty() || buf.len() < 14 { + return Ok(Message::NotReady); + } + if &buf[..14] == &HTTP2_PREFACE[..] { + return Ok(Message::Http2) } // Parse http message @@ -243,7 +261,7 @@ impl Reader { let headers_len = req.headers.len(); (len, method, path, version, headers_len) } - httparse::Status::Partial => return Ok(None), + httparse::Status::Partial => return Ok(Message::NotReady), } }; @@ -320,7 +338,7 @@ impl Reader { None } }; - Ok(Some((msg, decoder))) + Ok(Message::Http1(msg, decoder)) } } @@ -345,6 +363,268 @@ fn record_header_indices(bytes: &[u8], } } +/// Decoders to handle different Transfer-Encodings. +/// +/// If a message body does not include a Transfer-Encoding, it *should* +/// include a Content-Length header. +#[derive(Debug, Clone, PartialEq)] +pub struct Decoder { + kind: Kind, +} + +impl Decoder { + pub fn length(x: u64) -> Decoder { + Decoder { kind: Kind::Length(x) } + } + + pub fn chunked() -> Decoder { + Decoder { kind: Kind::Chunked(ChunkedState::Size, 0) } + } + + pub fn eof() -> Decoder { + Decoder { kind: Kind::Eof(false) } + } +} + +#[derive(Debug, Clone, PartialEq)] +enum Kind { + /// A Reader used when a Content-Length header is passed with a positive integer. + Length(u64), + /// A Reader used when Transfer-Encoding is `chunked`. + Chunked(ChunkedState, u64), + /// A Reader used for responses that don't indicate a length or chunked. + /// + /// Note: This should only used for `Response`s. It is illegal for a + /// `Request` to be made with both `Content-Length` and + /// `Transfer-Encoding: chunked` missing, as explained from the spec: + /// + /// > If a Transfer-Encoding header field is present in a response and + /// > the chunked transfer coding is not the final encoding, the + /// > message body length is determined by reading the connection until + /// > it is closed by the server. If a Transfer-Encoding header field + /// > is present in a request and the chunked transfer coding is not + /// > the final encoding, the message body length cannot be determined + /// > reliably; the server MUST respond with the 400 (Bad Request) + /// > status code and then close the connection. + Eof(bool), +} + +#[derive(Debug, PartialEq, Clone)] +enum ChunkedState { + Size, + SizeLws, + Extension, + SizeLf, + Body, + BodyCr, + BodyLf, + EndCr, + EndLf, + End, +} + +impl Decoder { + pub fn is_eof(&self) -> bool { + trace!("is_eof? {:?}", self); + match self.kind { + Kind::Length(0) | + Kind::Chunked(ChunkedState::End, _) | + Kind::Eof(true) => true, + _ => false, + } + } +} + +impl Decoder { + pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { + match self.kind { + Kind::Length(ref mut remaining) => { + trace!("Sized read, remaining={:?}", remaining); + if *remaining == 0 { + Ok(Async::Ready(None)) + } else { + if body.is_empty() { + return Ok(Async::NotReady) + } + let len = body.len() as u64; + let buf; + if *remaining > len { + buf = body.take().freeze(); + *remaining -= len; + } else { + buf = body.split_to(*remaining as usize).freeze(); + *remaining = 0; + } + trace!("Length read: {}", buf.len()); + Ok(Async::Ready(Some(buf))) + } + } + Kind::Chunked(ref mut state, ref mut size) => { + loop { + let mut buf = None; + // advances the chunked state + *state = try_ready!(state.step(body, size, &mut buf)); + if *state == ChunkedState::End { + trace!("End of chunked stream"); + return Ok(Async::Ready(None)); + } + if let Some(buf) = buf { + return Ok(Async::Ready(Some(buf))); + } + if body.is_empty() { + return Ok(Async::NotReady); + } + } + } + Kind::Eof(ref mut is_eof) => { + if *is_eof { + Ok(Async::Ready(None)) + } else if !body.is_empty() { + Ok(Async::Ready(Some(body.take().freeze()))) + } else { + Ok(Async::NotReady) + } + } + } + } +} + +macro_rules! byte ( + ($rdr:ident) => ({ + if $rdr.len() > 0 { + let b = $rdr[0]; + $rdr.split_to(1); + b + } else { + return Ok(Async::NotReady) + } + }) +); + +impl ChunkedState { + fn step(&self, body: &mut BytesMut, size: &mut u64, buf: &mut Option) + -> Poll + { + use self::ChunkedState::*; + match *self { + Size => ChunkedState::read_size(body, size), + SizeLws => ChunkedState::read_size_lws(body), + Extension => ChunkedState::read_extension(body), + SizeLf => ChunkedState::read_size_lf(body, size), + Body => ChunkedState::read_body(body, size, buf), + BodyCr => ChunkedState::read_body_cr(body), + BodyLf => ChunkedState::read_body_lf(body), + EndCr => ChunkedState::read_end_cr(body), + EndLf => ChunkedState::read_end_lf(body), + End => Ok(Async::Ready(ChunkedState::End)), + } + } + fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll { + trace!("Read chunk hex size"); + let radix = 16; + match byte!(rdr) { + b @ b'0'...b'9' => { + *size *= radix; + *size += u64::from(b - b'0'); + } + b @ b'a'...b'f' => { + *size *= radix; + *size += u64::from(b + 10 - b'a'); + } + b @ b'A'...b'F' => { + *size *= radix; + *size += u64::from(b + 10 - b'A'); + } + b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)), + b';' => return Ok(Async::Ready(ChunkedState::Extension)), + b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)), + _ => { + return Err(io::Error::new(io::ErrorKind::InvalidInput, + "Invalid chunk size line: Invalid Size")); + } + } + Ok(Async::Ready(ChunkedState::Size)) + } + fn read_size_lws(rdr: &mut BytesMut) -> Poll { + trace!("read_size_lws"); + match byte!(rdr) { + // LWS can follow the chunk size, but no more digits can come + b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)), + b';' => Ok(Async::Ready(ChunkedState::Extension)), + b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), + _ => { + Err(io::Error::new(io::ErrorKind::InvalidInput, + "Invalid chunk size linear white space")) + } + } + } + fn read_extension(rdr: &mut BytesMut) -> Poll { + trace!("read_extension"); + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), + _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions + } + } + fn read_size_lf(rdr: &mut BytesMut, size: &mut u64) -> Poll { + trace!("Chunk size is {:?}", size); + match byte!(rdr) { + b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), + b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk size LF")), + } + } + + fn read_body(rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option) + -> Poll + { + trace!("Chunked read, remaining={:?}", rem); + + let len = rdr.len() as u64; + if len == 0 { + Ok(Async::Ready(ChunkedState::Body)) + } else { + let slice; + if *rem > len { + slice = rdr.take().freeze(); + *rem -= len; + } else { + slice = rdr.split_to(*rem as usize).freeze(); + *rem = 0; + } + *buf = Some(slice); + if *rem > 0 { + Ok(Async::Ready(ChunkedState::Body)) + } else { + Ok(Async::Ready(ChunkedState::BodyCr)) + } + } + } + + fn read_body_cr(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body CR")), + } + } + fn read_body_lf(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\n' => Ok(Async::Ready(ChunkedState::Size)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body LF")), + } + } + fn read_end_cr(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::EndLf)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end CR")), + } + } + fn read_end_lf(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\n' => Ok(Async::Ready(ChunkedState::End)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end LF")), + } + } +} #[cfg(test)] mod tests { diff --git a/src/h2.rs b/src/h2.rs new file mode 100644 index 000000000..21ab1e191 --- /dev/null +++ b/src/h2.rs @@ -0,0 +1,51 @@ +use std::{io, cmp}; +use std::io::{Read, Write}; +use bytes::{Buf, Bytes}; +use futures::Poll; +use tokio_io::{AsyncRead, AsyncWrite}; + + +struct IoWrapper { + unread: Option, + inner: T, +} + +impl Read for IoWrapper { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if let Some(mut bytes) = self.unread.take() { + let size = cmp::min(buf.len(), bytes.len()); + buf.copy_from_slice(&bytes[..size]); + bytes.split_to(size); + if !bytes.is_empty() { + self.unread = Some(bytes); + } + Ok(size) + } else { + self.inner.read(buf) + } + } +} + +impl Write for IoWrapper { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +impl AsyncRead for IoWrapper { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } +} + +impl AsyncWrite for IoWrapper { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.inner.shutdown() + } + fn write_buf(&mut self, buf: &mut B) -> Poll { + self.inner.write_buf(buf) + } +} diff --git a/src/lib.rs b/src/lib.rs index 79e193dae..838307950 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,7 +31,6 @@ mod body; mod context; mod error; mod date; -mod decode; mod httprequest; mod httpresponse; mod logger; @@ -39,12 +38,13 @@ mod payload; mod resource; mod recognizer; mod route; -mod reader; mod task; mod staticfiles; mod server; mod wsframe; mod wsproto; +mod h1; +mod h2; pub mod ws; pub mod dev; diff --git a/src/server.rs b/src/server.rs index 149d5bb40..3d852e50d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -16,8 +16,8 @@ use native_tls::TlsAcceptor; #[cfg(feature="tls")] use tokio_tls::{TlsStream, TlsAcceptorExt}; +use h1; use task::Task; -use reader::{Reader, ReaderError}; use payload::Payload; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; @@ -150,9 +150,16 @@ impl HttpServer, net::SocketAddr, H> { let acc = acceptor.clone(); ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| { + println!("SSL"); TlsAcceptorExt::accept_async(acc.as_ref(), stream) - .map(move |t| IoStream(t, addr)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .map(move |t| { + println!("connected {:?} {:?}", t, addr); + IoStream(t, addr) + }) + .map_err(|err| { + println!("ERR: {:?}", err); + io::Error::new(io::ErrorKind::Other, err) + }) })); } self @@ -181,7 +188,7 @@ impl Handler, io::Error> for HttpServer H: HttpHandler + 'static, { fn error(&mut self, err: io::Error, _: &mut Context) { - trace!("Error handling request: {}", err) + println!("Error handling request: {}", err) } fn handle(&mut self, msg: IoStream, _: &mut Context) @@ -191,7 +198,7 @@ impl Handler, io::Error> for HttpServer HttpChannel{router: Rc::clone(&self.h), addr: msg.1, stream: msg.0, - reader: Reader::new(), + reader: h1::Reader::new(), error: false, items: VecDeque::new(), inactive: VecDeque::new(), @@ -202,7 +209,6 @@ impl Handler, io::Error> for HttpServer } } - struct Entry { task: Task, req: UnsafeCell, @@ -219,7 +225,7 @@ pub struct HttpChannel { #[allow(dead_code)] addr: A, stream: T, - reader: Reader, + reader: h1::Reader, error: bool, items: VecDeque, inactive: VecDeque, @@ -380,7 +386,7 @@ impl Future for HttpChannel self.error = true; if self.items.is_empty() { - if let ReaderError::Error(err) = err { + if let h1::ReaderError::Error(err) = err { self.items.push_back( Entry {task: Task::reply(err), req: UnsafeCell::new(HttpRequest::for_error()), diff --git a/src/task.rs b/src/task.rs index b1486c3ea..ec3f6bd59 100644 --- a/src/task.rs +++ b/src/task.rs @@ -10,7 +10,7 @@ use http::header::{HeaderValue, use bytes::BytesMut; use futures::{Async, Future, Poll, Stream}; use futures::task::{Task as FutureTask, current as current_task}; -use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::AsyncWrite; use date; use body::Body; @@ -316,7 +316,7 @@ impl Task { } pub(crate) fn poll_io(&mut self, io: &mut T, req: &mut HttpRequest) -> Poll - where T: AsyncRead + AsyncWrite + where T: AsyncWrite { trace!("POLL-IO frames:{:?}", self.frames.len()); // response is completed