1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

simplify client decoder

This commit is contained in:
Nikolay Kim 2018-11-14 10:52:40 -08:00
parent 6297fe0d41
commit 03ad9a3105
6 changed files with 46 additions and 90 deletions

View File

@ -45,9 +45,9 @@ rust-tls = ["rustls", "actix-net/rust-tls"]
[dependencies] [dependencies]
actix = "0.7.5" actix = "0.7.5"
#actix-net = "0.2.0" #actix-net = "0.2.2"
#actix-net = { git="https://github.com/actix/actix-net.git" } actix-net = { git="https://github.com/actix/actix-net.git" }
actix-net = { path="../actix-net" } #actix-net = { path="../actix-net" }
base64 = "0.9" base64 = "0.9"
bitflags = "1.0" bitflags = "1.0"

View File

@ -40,11 +40,12 @@ where
.into_future() .into_future()
.map_err(|(e, _)| SendRequestError::from(e)) .map_err(|(e, _)| SendRequestError::from(e))
.and_then(|(item, framed)| { .and_then(|(item, framed)| {
if let Some(item) = item { if let Some(res) = item {
let mut res = item.into_item().unwrap();
match framed.get_codec().message_type() { match framed.get_codec().message_type() {
h1::MessageType::None => release_connection(framed), h1::MessageType::None => release_connection(framed),
_ => *res.payload.borrow_mut() = Some(Payload::stream(framed)), _ => {
*res.payload.borrow_mut() = Some(Payload::stream(framed))
}
} }
ok(res) ok(res)
} else { } else {
@ -92,21 +93,14 @@ where
&& !self.framed.as_ref().unwrap().is_write_buf_full() && !self.framed.as_ref().unwrap().is_write_buf_full()
{ {
match self.body.as_mut().unwrap().poll_next()? { match self.body.as_mut().unwrap().poll_next()? {
Async::Ready(None) => { Async::Ready(item) => {
self.flushed = false; self.flushed = false;
self.framed self.framed
.as_mut() .as_mut()
.unwrap() .unwrap()
.start_send(h1::Message::Chunk(None))?; .force_send(h1::Message::Chunk(item))?;
break; break;
} }
Async::Ready(Some(chunk)) => {
self.flushed = false;
self.framed
.as_mut()
.unwrap()
.start_send(h1::Message::Chunk(Some(chunk)))?;
}
Async::NotReady => body_ready = false, Async::NotReady => body_ready = false,
} }
} }

View File

@ -178,17 +178,13 @@ impl ClientCodecInner {
} }
impl Decoder for ClientCodec { impl Decoder for ClientCodec {
type Item = Message<ClientResponse>; type Item = ClientResponse;
type Error = ParseError; type Error = ParseError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if self.inner.payload.is_some() { debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set");
Ok(match self.inner.payload.as_mut().unwrap().decode(src)? {
Some(PayloadItem::Chunk(chunk)) => Some(Message::Chunk(Some(chunk))), if let Some((req, payload)) = self.inner.decoder.decode(src)? {
Some(PayloadItem::Eof) => Some(Message::Chunk(None)),
None => None,
})
} else if let Some((req, payload)) = self.inner.decoder.decode(src)? {
self.inner self.inner
.flags .flags
.set(Flags::HEAD, req.inner.method == Method::HEAD); .set(Flags::HEAD, req.inner.method == Method::HEAD);
@ -204,7 +200,7 @@ impl Decoder for ClientCodec {
self.inner.flags.insert(Flags::STREAM); self.inner.flags.insert(Flags::STREAM);
} }
}; };
Ok(Some(Message::Item(req))) Ok(Some(req))
} else { } else {
Ok(None) Ok(None)
} }
@ -216,7 +212,7 @@ impl Decoder for ClientPayloadCodec {
type Error = PayloadError; type Error = PayloadError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
assert!( debug_assert!(
self.inner.payload.is_some(), self.inner.payload.is_some(),
"Payload decoder is not specified" "Payload decoder is not specified"
); );

View File

@ -64,7 +64,7 @@ enum State<S: Service> {
None, None,
ServiceCall(S::Future), ServiceCall(S::Future),
SendResponse(Option<(Message<Response>, Body)>), SendResponse(Option<(Message<Response>, Body)>),
SendPayload(Option<BodyStream>, Option<Message<Response>>), SendPayload(BodyStream),
} }
impl<S: Service> State<S> { impl<S: Service> State<S> {
@ -204,21 +204,23 @@ where
// send respons // send respons
State::SendResponse(ref mut item) => { State::SendResponse(ref mut item) => {
let (msg, body) = item.take().expect("SendResponse is empty"); let (msg, body) = item.take().expect("SendResponse is empty");
match self.framed.as_mut().unwrap().start_send(msg) { let framed = self.framed.as_mut().unwrap();
match framed.start_send(msg) {
Ok(AsyncSink::Ready) => { Ok(AsyncSink::Ready) => {
self.flags.set( self.flags
Flags::KEEPALIVE, .set(Flags::KEEPALIVE, framed.get_codec().keepalive());
self.framed.as_mut().unwrap().get_codec().keepalive(),
);
self.flags.remove(Flags::FLUSHED); self.flags.remove(Flags::FLUSHED);
match body { match body {
Body::Empty => Some(State::None), Body::Empty => Some(State::None),
Body::Binary(mut bin) => Some(State::SendPayload(
None,
Some(Message::Chunk(Some(bin.take()))),
)),
Body::Streaming(stream) => { Body::Streaming(stream) => {
Some(State::SendPayload(Some(stream), None)) Some(State::SendPayload(stream))
}
Body::Binary(mut bin) => {
self.flags.remove(Flags::FLUSHED);
framed
.force_send(Message::Chunk(Some(bin.take())))?;
framed.force_send(Message::Chunk(None))?;
Some(State::None)
} }
} }
} }
@ -235,51 +237,28 @@ where
} }
} }
// Send payload // Send payload
State::SendPayload(ref mut stream, ref mut bin) => { State::SendPayload(ref mut stream) => {
println!("SEND payload"); let mut framed = self.framed.as_mut().unwrap();
if let Some(item) = bin.take() { loop {
let mut framed = self.framed.as_mut().unwrap(); if !framed.is_write_buf_full() {
if framed.is_ match stream.poll().map_err(|_| DispatchError::Unknown)? {
match self.framed.as_mut().unwrap().start_send(item) { Async::Ready(Some(item)) => {
Ok(AsyncSink::Ready) => {
self.flags.remove(Flags::FLUSHED);
}
Ok(AsyncSink::NotReady(item)) => {
*bin = Some(item);
return Ok(());
}
Err(err) => return Err(DispatchError::Io(err)),
}
}
if let Some(ref mut stream) = stream {
match stream.poll() {
Ok(Async::Ready(Some(item))) => match self
.framed
.as_mut()
.unwrap()
.start_send(Message::Chunk(Some(item)))
{
Ok(AsyncSink::Ready) => {
self.flags.remove(Flags::FLUSHED); self.flags.remove(Flags::FLUSHED);
framed.force_send(Message::Chunk(Some(item)))?;
continue; continue;
} }
Ok(AsyncSink::NotReady(msg)) => { Async::Ready(None) => {
*bin = Some(msg); self.flags.remove(Flags::FLUSHED);
return Ok(()); framed.force_send(Message::Chunk(None))?;
} }
Err(err) => return Err(DispatchError::Io(err)), Async::NotReady => return Ok(()),
}, }
Ok(Async::Ready(None)) => Some(State::SendPayload( } else {
None, return Ok(());
Some(Message::Chunk(None)),
)),
Ok(Async::NotReady) => return Ok(()),
// Err(err) => return Err(DispatchError::Io(err)),
Err(_) => return Err(DispatchError::Unknown),
} }
} else { break;
Some(State::None)
} }
None
} }
}; };

View File

@ -33,15 +33,6 @@ pub enum Message<T> {
Chunk(Option<Bytes>), Chunk(Option<Bytes>),
} }
impl<T> Message<T> {
pub fn into_item(self) -> Option<T> {
match self {
Message::Item(item) => Some(item),
_ => None,
}
}
}
impl<T> From<T> for Message<T> { impl<T> From<T> for Message<T> {
fn from(item: T) -> Self { fn from(item: T) -> Self {
Message::Item(item) Message::Item(item)

View File

@ -172,10 +172,7 @@ where
{ {
fut: Box< fut: Box<
Future< Future<
Item = ( Item = (Option<ClientResponse>, Framed<T, h1::ClientCodec>),
Option<h1::Message<ClientResponse>>,
Framed<T, h1::ClientCodec>,
),
Error = ClientError, Error = ClientError,
>, >,
>, >,
@ -196,8 +193,7 @@ where
let (item, framed) = try_ready!(self.fut.poll()); let (item, framed) = try_ready!(self.fut.poll());
let res = match item { let res = match item {
Some(h1::Message::Item(res)) => res, Some(res) => res,
Some(h1::Message::Chunk(_)) => unreachable!(),
None => return Err(ClientError::Disconnected), None => return Err(ClientError::Disconnected),
}; };