1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

refactor te encoding

This commit is contained in:
Nikolay Kim 2018-10-06 22:36:57 -07:00
parent dda5b399ca
commit b0ca6220f0
4 changed files with 83 additions and 100 deletions

View File

@ -374,7 +374,7 @@ impl ResponseError for cookie::ParseError {
#[derive(Debug)] #[derive(Debug)]
/// A set of errors that can occur during dispatching http requests /// A set of errors that can occur during dispatching http requests
pub enum DispatchError<E: fmt::Debug + fmt::Display> { pub enum DispatchError<E> {
/// Service error /// Service error
// #[fail(display = "Application specific error: {}", _0)] // #[fail(display = "Application specific error: {}", _0)]
Service(E), Service(E),
@ -413,13 +413,13 @@ pub enum DispatchError<E: fmt::Debug + fmt::Display> {
Unknown, Unknown,
} }
impl<E: fmt::Debug + fmt::Display> From<ParseError> for DispatchError<E> { impl<E> From<ParseError> for DispatchError<E> {
fn from(err: ParseError) -> Self { fn from(err: ParseError) -> Self {
DispatchError::Parse(err) DispatchError::Parse(err)
} }
} }
impl<E: fmt::Debug + fmt::Display> From<io::Error> for DispatchError<E> { impl<E> From<io::Error> for DispatchError<E> {
fn from(err: io::Error) -> Self { fn from(err: io::Error) -> Self {
DispatchError::Io(err) DispatchError::Io(err)
} }

View File

@ -54,7 +54,6 @@ pub struct Codec {
// encoder part // encoder part
flags: Flags, flags: Flags,
written: u64,
headers_size: u32, headers_size: u32,
te: ResponseEncoder, te: ResponseEncoder,
} }
@ -82,31 +81,30 @@ impl Codec {
version: Version::HTTP_11, version: Version::HTTP_11,
flags, flags,
written: 0,
headers_size: 0, headers_size: 0,
te: ResponseEncoder::default(), te: ResponseEncoder::default(),
} }
} }
fn written(&self) -> u64 { /// Check if request is upgrade
self.written
}
pub fn upgrade(&self) -> bool { pub fn upgrade(&self) -> bool {
self.flags.contains(Flags::UPGRADE) self.flags.contains(Flags::UPGRADE)
} }
/// Check if last response is keep-alive
pub fn keepalive(&self) -> bool { pub fn keepalive(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE) self.flags.contains(Flags::KEEPALIVE)
} }
/// prepare transfer encoding
pub fn prepare_te(&mut self, res: &mut Response) {
self.te
.update(res, self.flags.contains(Flags::HEAD), self.version);
}
fn encode_response( fn encode_response(
&mut self, mut msg: Response, buffer: &mut BytesMut, &mut self, mut msg: Response, buffer: &mut BytesMut,
) -> io::Result<()> { ) -> io::Result<()> {
// prepare transfer encoding
self.te
.update(&mut msg, self.flags.contains(Flags::HEAD), self.version);
let ka = self.flags.contains(Flags::KEEPALIVE_ENABLED) && msg let ka = self.flags.contains(Flags::KEEPALIVE_ENABLED) && msg
.keep_alive() .keep_alive()
.unwrap_or_else(|| self.flags.contains(Flags::KEEPALIVE)); .unwrap_or_else(|| self.flags.contains(Flags::KEEPALIVE));
@ -131,12 +129,11 @@ impl Codec {
msg.headers_mut() msg.headers_mut()
.insert(CONNECTION, HeaderValue::from_static("close")); .insert(CONNECTION, HeaderValue::from_static("close"));
} }
let body = msg.replace_body(Body::Empty);
// render message // render message
{ {
let reason = msg.reason().as_bytes(); let reason = msg.reason().as_bytes();
if let Body::Binary(ref bytes) = body { if let Body::Binary(ref bytes) = msg.body() {
buffer.reserve( buffer.reserve(
256 + msg.headers().len() * AVERAGE_HEADER_SIZE 256 + msg.headers().len() * AVERAGE_HEADER_SIZE
+ bytes.len() + bytes.len()
@ -229,16 +226,6 @@ impl Codec {
self.headers_size = buffer.len() as u32; self.headers_size = buffer.len() as u32;
} }
if let Body::Binary(bytes) = body {
self.written = bytes.len() as u64;
// buffer.write(bytes.as_ref())?;
buffer.extend_from_slice(bytes.as_ref());
} else {
// capacity, makes sense only for streaming or actor
// self.buffer_capacity = msg.write_buffer_capacity();
msg.replace_body(body);
}
Ok(()) Ok(())
} }
} }
@ -282,7 +269,6 @@ impl Encoder for Codec {
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
match item { match item {
OutMessage::Response(res) => { OutMessage::Response(res) => {
self.written = 0;
self.encode_response(res, dst)?; self.encode_response(res, dst)?;
} }
OutMessage::Payload(bytes) => { OutMessage::Payload(bytes) => {

View File

@ -1,15 +1,15 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt::{Debug, Display};
use std::time::Instant; use std::time::Instant;
use actix_net::codec::Framed; use actix_net::codec::Framed;
use actix_net::service::Service; use actix_net::service::Service;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use log::Level::Debug;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay; use tokio_timer::Delay;
use error::{ParseError, PayloadError}; use error::{Error, ParseError, PayloadError};
use payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter}; use payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter};
use body::Body; use body::Body;
@ -38,7 +38,7 @@ bitflags! {
/// Dispatcher for HTTP/1.1 protocol /// Dispatcher for HTTP/1.1 protocol
pub struct Dispatcher<T, S: Service> pub struct Dispatcher<T, S: Service>
where where
S::Error: Debug + Display, S::Error: Into<Error>,
{ {
service: S, service: S,
flags: Flags, flags: Flags,
@ -81,7 +81,7 @@ impl<T, S> Dispatcher<T, S>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: Service<Request = Request, Response = Response>, S: Service<Request = Request, Response = Response>,
S::Error: Debug + Display, S::Error: Into<Error>,
{ {
/// Create http/1 dispatcher. /// Create http/1 dispatcher.
pub fn new(stream: T, config: ServiceConfig, service: S) -> Self { pub fn new(stream: T, config: ServiceConfig, service: S) -> Self {
@ -177,52 +177,34 @@ where
State::None => loop { State::None => loop {
break if let Some(msg) = self.messages.pop_front() { break if let Some(msg) = self.messages.pop_front() {
match msg { match msg {
Message::Item(msg) => { Message::Item(req) => Some(self.handle_request(req)),
let mut task = self.service.call(msg); Message::Error(res) => Some(State::SendResponse(Some(
match task.poll() {
Ok(Async::Ready(res)) => {
if res.body().is_streaming() {
unimplemented!()
} else {
Some(Ok(State::SendResponse(Some(
OutMessage::Response(res),
))))
}
}
Ok(Async::NotReady) => {
Some(Ok(State::Response(task)))
}
Err(err) => Some(Err(DispatchError::Service(err))),
}
}
Message::Error(res) => Some(Ok(State::SendResponse(Some(
OutMessage::Response(res), OutMessage::Response(res),
)))), ))),
} }
} else { } else {
None None
}; };
}, },
State::Payload(ref mut _body) => unimplemented!(), State::Payload(ref mut _body) => unimplemented!(),
State::Response(ref mut fut) => { State::Response(ref mut fut) => match fut.poll() {
match fut.poll() { Ok(Async::Ready(mut res)) => {
Ok(Async::Ready(res)) => { self.framed.get_codec_mut().prepare_te(&mut res);
if res.body().is_streaming() { if res.body().is_streaming() {
unimplemented!() unimplemented!()
} else { } else {
Some(Ok(State::SendResponse(Some( Some(State::SendResponse(Some(OutMessage::Response(res))))
OutMessage::Response(res),
))))
}
}
Ok(Async::NotReady) => None,
Err(err) => {
// it is not possible to recover from error
// during pipe handling, so just drop connection
Some(Err(DispatchError::Service(err)))
} }
} }
} Ok(Async::NotReady) => None,
Err(err) => {
let err = err.into();
if log_enabled!(Debug) {
debug!("{:?}", err);
}
Some(State::SendResponse(Some(OutMessage::Response(err.into()))))
}
},
State::SendResponse(ref mut item) => { State::SendResponse(ref mut item) => {
let msg = item.take().expect("SendResponse is empty"); let msg = item.take().expect("SendResponse is empty");
match self.framed.start_send(msg) { match self.framed.start_send(msg) {
@ -232,13 +214,19 @@ where
self.framed.get_codec().keepalive(), self.framed.get_codec().keepalive(),
); );
self.flags.remove(Flags::FLUSHED); self.flags.remove(Flags::FLUSHED);
Some(Ok(State::None)) Some(State::None)
} }
Ok(AsyncSink::NotReady(msg)) => { Ok(AsyncSink::NotReady(msg)) => {
*item = Some(msg); *item = Some(msg);
return Ok(()); return Ok(());
} }
Err(err) => Some(Err(DispatchError::Io(err))), Err(err) => {
self.flags.insert(Flags::READ_DISCONNECTED);
if let Some(mut payload) = self.payload.take() {
payload.set_error(PayloadError::Incomplete);
}
return Err(DispatchError::Io(err));
}
} }
} }
State::SendResponseWithPayload(ref mut item) => { State::SendResponseWithPayload(ref mut item) => {
@ -251,23 +239,25 @@ where
self.framed.get_codec().keepalive(), self.framed.get_codec().keepalive(),
); );
self.flags.remove(Flags::FLUSHED); self.flags.remove(Flags::FLUSHED);
Some(Ok(State::Payload(body))) Some(State::Payload(body))
} }
Ok(AsyncSink::NotReady(msg)) => { Ok(AsyncSink::NotReady(msg)) => {
*item = Some((msg, body)); *item = Some((msg, body));
return Ok(()); return Ok(());
} }
Err(err) => Some(Err(DispatchError::Io(err))), Err(err) => {
self.flags.insert(Flags::READ_DISCONNECTED);
if let Some(mut payload) = self.payload.take() {
payload.set_error(PayloadError::Incomplete);
}
return Err(DispatchError::Io(err));
}
} }
} }
}; };
match state { match state {
Some(Ok(state)) => self.state = state, Some(state) => self.state = state,
Some(Err(err)) => {
self.client_disconnected();
return Err(err);
}
None => { None => {
// if read-backpressure is enabled and we consumed some data. // if read-backpressure is enabled and we consumed some data.
// we may read more dataand retry // we may read more dataand retry
@ -283,6 +273,28 @@ where
Ok(()) Ok(())
} }
fn handle_request(&mut self, req: Request) -> State<S> {
let mut task = self.service.call(req);
match task.poll() {
Ok(Async::Ready(mut res)) => {
self.framed.get_codec_mut().prepare_te(&mut res);
if res.body().is_streaming() {
unimplemented!()
} else {
State::SendResponse(Some(OutMessage::Response(res)))
}
}
Ok(Async::NotReady) => State::Response(task),
Err(err) => {
let err = err.into();
if log_enabled!(Debug) {
debug!("{:?}", err);
}
State::SendResponse(Some(OutMessage::Response(err.into())))
}
}
}
fn one_message(&mut self, msg: InMessage) -> Result<(), DispatchError<S::Error>> { fn one_message(&mut self, msg: InMessage) -> Result<(), DispatchError<S::Error>> {
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
@ -290,23 +302,7 @@ where
InMessage::Message(msg) => { InMessage::Message(msg) => {
// handle request early // handle request early
if self.state.is_empty() { if self.state.is_empty() {
let mut task = self.service.call(msg); self.state = self.handle_request(msg);
match task.poll() {
Ok(Async::Ready(res)) => {
if res.body().is_streaming() {
unimplemented!()
} else {
self.state =
State::SendResponse(Some(OutMessage::Response(res)));
}
}
Ok(Async::NotReady) => self.state = State::Response(task),
Err(err) => {
error!("Unhandled application error: {}", err);
self.client_disconnected();
return Err(DispatchError::Service(err));
}
}
} else { } else {
self.messages.push_back(Message::Item(msg)); self.messages.push_back(Message::Item(msg));
} }
@ -449,7 +445,7 @@ impl<T, S> Future for Dispatcher<T, S>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: Service<Request = Request, Response = Response>, S: Service<Request = Request, Response = Response>,
S::Error: Debug + Display, S::Error: Into<Error>,
{ {
type Item = (); type Item = ();
type Error = DispatchError<S::Error>; type Error = DispatchError<S::Error>;

View File

@ -1,4 +1,3 @@
use std::fmt::{Debug, Display};
use std::marker::PhantomData; use std::marker::PhantomData;
use actix_net::codec::Framed; use actix_net::codec::Framed;
@ -7,7 +6,7 @@ use futures::{future, Async, Future, Poll, Stream};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use config::ServiceConfig; use config::ServiceConfig;
use error::{DispatchError, ParseError}; use error::{DispatchError, Error, ParseError};
use request::Request; use request::Request;
use response::Response; use response::Response;
@ -24,6 +23,8 @@ pub struct H1Service<T, S> {
impl<T, S> H1Service<T, S> impl<T, S> H1Service<T, S>
where where
S: NewService, S: NewService,
S::Service: Clone,
S::Error: Into<Error>,
{ {
/// Create new `HttpService` instance. /// Create new `HttpService` instance.
pub fn new<F: IntoNewService<S>>(cfg: ServiceConfig, service: F) -> Self { pub fn new<F: IntoNewService<S>>(cfg: ServiceConfig, service: F) -> Self {
@ -40,7 +41,7 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: NewService<Request = Request, Response = Response> + Clone, S: NewService<Request = Request, Response = Response> + Clone,
S::Service: Clone, S::Service: Clone,
S::Error: Debug + Display, S::Error: Into<Error>,
{ {
type Request = T; type Request = T;
type Response = (); type Response = ();
@ -69,7 +70,7 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: NewService<Request = Request, Response = Response>, S: NewService<Request = Request, Response = Response>,
S::Service: Clone, S::Service: Clone,
S::Error: Debug + Display, S::Error: Into<Error>,
{ {
type Item = H1ServiceHandler<T, S::Service>; type Item = H1ServiceHandler<T, S::Service>;
type Error = S::InitError; type Error = S::InitError;
@ -93,7 +94,7 @@ pub struct H1ServiceHandler<T, S> {
impl<T, S> H1ServiceHandler<T, S> impl<T, S> H1ServiceHandler<T, S>
where where
S: Service<Request = Request, Response = Response> + Clone, S: Service<Request = Request, Response = Response> + Clone,
S::Error: Debug + Display, S::Error: Into<Error>,
{ {
fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler<T, S> { fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler<T, S> {
H1ServiceHandler { H1ServiceHandler {
@ -108,7 +109,7 @@ impl<T, S> Service for H1ServiceHandler<T, S>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: Service<Request = Request, Response = Response> + Clone, S: Service<Request = Request, Response = Response> + Clone,
S::Error: Debug + Display, S::Error: Into<Error>,
{ {
type Request = T; type Request = T;
type Response = (); type Response = ();