diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 15e7ef472..aefffc891 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -115,19 +115,6 @@ impl SendRequest { self.conn_timeout = timeout; self } - - fn poll_timeout(&mut self) -> Poll<(), SendRequestError> { - if self.timeout.is_none() { - self.timeout = Some(Timeout::new( - Duration::from_secs(5), Arbiter::handle()).unwrap()); - } - - match self.timeout.as_mut().unwrap().poll() { - Ok(Async::Ready(())) => Err(SendRequestError::Timeout), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => unreachable!() - } - } } impl Future for SendRequest { @@ -135,8 +122,6 @@ impl Future for SendRequest { type Error = SendRequestError; fn poll(&mut self) -> Poll { - self.poll_timeout()?; - loop { let state = mem::replace(&mut self.state, State::None); @@ -170,6 +155,10 @@ impl Future for SendRequest { _ => IoBody::Done, }; + let timeout = self.timeout.take().unwrap_or_else(|| + Timeout::new( + Duration::from_secs(5), Arbiter::handle()).unwrap()); + let pl = Box::new(Pipeline { body, writer, conn: Some(conn), @@ -180,6 +169,7 @@ impl Future for SendRequest { decompress: None, should_decompress: self.req.response_decompress(), write_state: RunningState::Running, + timeout: Some(timeout), }); self.state = State::Send(pl); }, @@ -218,6 +208,7 @@ pub(crate) struct Pipeline { decompress: Option, should_decompress: bool, write_state: RunningState, + timeout: Option, } enum IoBody { @@ -292,10 +283,14 @@ impl Pipeline { let mut need_run = false; // need write? - if let Async::NotReady = self.poll_write() + match self.poll_write() .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))? { - need_run = true; + Async::NotReady => need_run = true, + Async::Ready(_) => { + let _ = self.poll_timeout() + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))?; + } } // need read? @@ -343,6 +338,18 @@ impl Pipeline { } } + fn poll_timeout(&mut self) -> Poll<(), SendRequestError> { + if self.timeout.is_some() { + match self.timeout.as_mut().unwrap().poll() { + Ok(Async::Ready(())) => Err(SendRequestError::Timeout), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => unreachable!() + } + } else { + Ok(Async::NotReady) + } + } + #[inline] fn poll_write(&mut self) -> Poll<(), Error> { if self.write_state == RunningState::Done || self.conn.is_none() { @@ -350,7 +357,6 @@ impl Pipeline { } let mut done = false; - if self.drain.is_none() && self.write_state != RunningState::Paused { 'outter: loop { let result = match mem::replace(&mut self.body, IoBody::Done) { diff --git a/src/server/channel.rs b/src/server/channel.rs index 390aaee87..49ea586e1 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -69,7 +69,7 @@ impl Future for HttpChannel where T: IoStream, H: HttpHandler + 'sta type Error = (); fn poll(&mut self) -> Poll { - if !self.node.is_none() { + if self.node.is_some() { let el = self as *mut _; self.node = Some(Node::new(el)); let _ = match self.proto {