diff --git a/Cargo.toml b/Cargo.toml index 074e53631..80d245951 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,9 +45,9 @@ rust-tls = ["rustls", "actix-net/rust-tls"] [dependencies] actix = "0.7.5" -#actix-net = "0.2.0" -#actix-net = { git="https://github.com/actix/actix-net.git" } -actix-net = { path="../actix-net" } +#actix-net = "0.2.2" +actix-net = { git="https://github.com/actix/actix-net.git" } +#actix-net = { path="../actix-net" } base64 = "0.9" bitflags = "1.0" diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 4081b6354..24ec8366d 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -40,11 +40,12 @@ where .into_future() .map_err(|(e, _)| SendRequestError::from(e)) .and_then(|(item, framed)| { - if let Some(item) = item { - let mut res = item.into_item().unwrap(); + if let Some(res) = item { match framed.get_codec().message_type() { h1::MessageType::None => release_connection(framed), - _ => *res.payload.borrow_mut() = Some(Payload::stream(framed)), + _ => { + *res.payload.borrow_mut() = Some(Payload::stream(framed)) + } } ok(res) } else { @@ -92,21 +93,14 @@ where && !self.framed.as_ref().unwrap().is_write_buf_full() { match self.body.as_mut().unwrap().poll_next()? { - Async::Ready(None) => { + Async::Ready(item) => { self.flushed = false; self.framed .as_mut() .unwrap() - .start_send(h1::Message::Chunk(None))?; + .force_send(h1::Message::Chunk(item))?; break; } - Async::Ready(Some(chunk)) => { - self.flushed = false; - self.framed - .as_mut() - .unwrap() - .start_send(h1::Message::Chunk(Some(chunk)))?; - } Async::NotReady => body_ready = false, } } diff --git a/src/h1/client.rs b/src/h1/client.rs index eb7bdc233..d2ac20348 100644 --- a/src/h1/client.rs +++ b/src/h1/client.rs @@ -178,17 +178,13 @@ impl ClientCodecInner { } impl Decoder for ClientCodec { - type Item = Message; + type Item = ClientResponse; type Error = ParseError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - if self.inner.payload.is_some() { - Ok(match self.inner.payload.as_mut().unwrap().decode(src)? { - Some(PayloadItem::Chunk(chunk)) => Some(Message::Chunk(Some(chunk))), - Some(PayloadItem::Eof) => Some(Message::Chunk(None)), - None => None, - }) - } else if let Some((req, payload)) = self.inner.decoder.decode(src)? { + debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set"); + + if let Some((req, payload)) = self.inner.decoder.decode(src)? { self.inner .flags .set(Flags::HEAD, req.inner.method == Method::HEAD); @@ -204,7 +200,7 @@ impl Decoder for ClientCodec { self.inner.flags.insert(Flags::STREAM); } }; - Ok(Some(Message::Item(req))) + Ok(Some(req)) } else { Ok(None) } @@ -216,7 +212,7 @@ impl Decoder for ClientPayloadCodec { type Error = PayloadError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - assert!( + debug_assert!( self.inner.payload.is_some(), "Payload decoder is not specified" ); diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index 71d6435c1..470d3df93 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -64,7 +64,7 @@ enum State { None, ServiceCall(S::Future), SendResponse(Option<(Message, Body)>), - SendPayload(Option, Option>), + SendPayload(BodyStream), } impl State { @@ -204,21 +204,23 @@ where // send respons State::SendResponse(ref mut item) => { let (msg, body) = item.take().expect("SendResponse is empty"); - match self.framed.as_mut().unwrap().start_send(msg) { + let framed = self.framed.as_mut().unwrap(); + match framed.start_send(msg) { Ok(AsyncSink::Ready) => { - self.flags.set( - Flags::KEEPALIVE, - self.framed.as_mut().unwrap().get_codec().keepalive(), - ); + self.flags + .set(Flags::KEEPALIVE, framed.get_codec().keepalive()); self.flags.remove(Flags::FLUSHED); match body { Body::Empty => Some(State::None), - Body::Binary(mut bin) => Some(State::SendPayload( - None, - Some(Message::Chunk(Some(bin.take()))), - )), Body::Streaming(stream) => { - Some(State::SendPayload(Some(stream), None)) + Some(State::SendPayload(stream)) + } + Body::Binary(mut bin) => { + self.flags.remove(Flags::FLUSHED); + framed + .force_send(Message::Chunk(Some(bin.take())))?; + framed.force_send(Message::Chunk(None))?; + Some(State::None) } } } @@ -235,51 +237,28 @@ where } } // Send payload - State::SendPayload(ref mut stream, ref mut bin) => { - println!("SEND payload"); - if let Some(item) = bin.take() { - let mut framed = self.framed.as_mut().unwrap(); - if framed.is_ - match self.framed.as_mut().unwrap().start_send(item) { - Ok(AsyncSink::Ready) => { - self.flags.remove(Flags::FLUSHED); - } - Ok(AsyncSink::NotReady(item)) => { - *bin = Some(item); - return Ok(()); - } - Err(err) => return Err(DispatchError::Io(err)), - } - } - if let Some(ref mut stream) = stream { - match stream.poll() { - Ok(Async::Ready(Some(item))) => match self - .framed - .as_mut() - .unwrap() - .start_send(Message::Chunk(Some(item))) - { - Ok(AsyncSink::Ready) => { + State::SendPayload(ref mut stream) => { + let mut framed = self.framed.as_mut().unwrap(); + loop { + if !framed.is_write_buf_full() { + match stream.poll().map_err(|_| DispatchError::Unknown)? { + Async::Ready(Some(item)) => { self.flags.remove(Flags::FLUSHED); + framed.force_send(Message::Chunk(Some(item)))?; continue; } - Ok(AsyncSink::NotReady(msg)) => { - *bin = Some(msg); - return Ok(()); + Async::Ready(None) => { + self.flags.remove(Flags::FLUSHED); + framed.force_send(Message::Chunk(None))?; } - Err(err) => return Err(DispatchError::Io(err)), - }, - Ok(Async::Ready(None)) => Some(State::SendPayload( - None, - Some(Message::Chunk(None)), - )), - Ok(Async::NotReady) => return Ok(()), - // Err(err) => return Err(DispatchError::Io(err)), - Err(_) => return Err(DispatchError::Unknown), + Async::NotReady => return Ok(()), + } + } else { + return Ok(()); } - } else { - Some(State::None) + break; } + None } }; diff --git a/src/h1/mod.rs b/src/h1/mod.rs index 21261e99c..818387004 100644 --- a/src/h1/mod.rs +++ b/src/h1/mod.rs @@ -33,15 +33,6 @@ pub enum Message { Chunk(Option), } -impl Message { - pub fn into_item(self) -> Option { - match self { - Message::Item(item) => Some(item), - _ => None, - } - } -} - impl From for Message { fn from(item: T) -> Self { Message::Item(item) diff --git a/src/ws/client/service.rs b/src/ws/client/service.rs index 80cf98684..34a151444 100644 --- a/src/ws/client/service.rs +++ b/src/ws/client/service.rs @@ -172,10 +172,7 @@ where { fut: Box< Future< - Item = ( - Option>, - Framed, - ), + Item = (Option, Framed), Error = ClientError, >, >, @@ -196,8 +193,7 @@ where let (item, framed) = try_ready!(self.fut.poll()); let res = match item { - Some(h1::Message::Item(res)) => res, - Some(h1::Message::Chunk(_)) => unreachable!(), + Some(res) => res, None => return Err(ClientError::Disconnected), };