1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

start client timeout for response only

This commit is contained in:
Nikolay Kim 2018-04-04 20:15:47 -07:00
parent c1af59c618
commit eeae0ddab4
2 changed files with 25 additions and 19 deletions

View File

@ -115,19 +115,6 @@ impl SendRequest {
self.conn_timeout = timeout;
self
}
fn poll_timeout(&mut self) -> Poll<(), SendRequestError> {
if self.timeout.is_none() {
self.timeout = Some(Timeout::new(
Duration::from_secs(5), Arbiter::handle()).unwrap());
}
match self.timeout.as_mut().unwrap().poll() {
Ok(Async::Ready(())) => Err(SendRequestError::Timeout),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => unreachable!()
}
}
}
impl Future for SendRequest {
@ -135,8 +122,6 @@ impl Future for SendRequest {
type Error = SendRequestError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_timeout()?;
loop {
let state = mem::replace(&mut self.state, State::None);
@ -170,6 +155,10 @@ impl Future for SendRequest {
_ => IoBody::Done,
};
let timeout = self.timeout.take().unwrap_or_else(||
Timeout::new(
Duration::from_secs(5), Arbiter::handle()).unwrap());
let pl = Box::new(Pipeline {
body, writer,
conn: Some(conn),
@ -180,6 +169,7 @@ impl Future for SendRequest {
decompress: None,
should_decompress: self.req.response_decompress(),
write_state: RunningState::Running,
timeout: Some(timeout),
});
self.state = State::Send(pl);
},
@ -218,6 +208,7 @@ pub(crate) struct Pipeline {
decompress: Option<PayloadStream>,
should_decompress: bool,
write_state: RunningState,
timeout: Option<Timeout>,
}
enum IoBody {
@ -292,10 +283,14 @@ impl Pipeline {
let mut need_run = false;
// need write?
if let Async::NotReady = self.poll_write()
match self.poll_write()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))?
{
need_run = true;
Async::NotReady => need_run = true,
Async::Ready(_) => {
let _ = self.poll_timeout()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))?;
}
}
// need read?
@ -343,6 +338,18 @@ impl Pipeline {
}
}
fn poll_timeout(&mut self) -> Poll<(), SendRequestError> {
if self.timeout.is_some() {
match self.timeout.as_mut().unwrap().poll() {
Ok(Async::Ready(())) => Err(SendRequestError::Timeout),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => unreachable!()
}
} else {
Ok(Async::NotReady)
}
}
#[inline]
fn poll_write(&mut self) -> Poll<(), Error> {
if self.write_state == RunningState::Done || self.conn.is_none() {
@ -350,7 +357,6 @@ impl Pipeline {
}
let mut done = false;
if self.drain.is_none() && self.write_state != RunningState::Paused {
'outter: loop {
let result = match mem::replace(&mut self.body, IoBody::Done) {

View File

@ -69,7 +69,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if !self.node.is_none() {
if self.node.is_some() {
let el = self as *mut _;
self.node = Some(Node::new(el));
let _ = match self.proto {