1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-06-25 22:49:21 +02:00

add expect: 100-continue support #141

This commit is contained in:
Nikolay Kim
2019-04-05 16:46:44 -07:00
parent 02fcaca3da
commit fbedaec661
18 changed files with 554 additions and 156 deletions

View File

@ -154,33 +154,37 @@ impl Encoder for Codec {
) -> Result<(), Self::Error> {
match item {
Message::Item((mut res, length)) => {
// set response version
res.head_mut().version = self.version;
// connection status
self.ctype = if let Some(ct) = res.head().ctype() {
if ct == ConnectionType::KeepAlive {
self.ctype
} else {
ct
}
if res.head().status == StatusCode::CONTINUE {
dst.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
} else {
self.ctype
};
// set response version
res.head_mut().version = self.version;
// encode message
let len = dst.len();
self.encoder.encode(
dst,
&mut res,
self.flags.contains(Flags::HEAD),
self.flags.contains(Flags::STREAM),
self.version,
length,
self.ctype,
&self.config,
)?;
self.headers_size = (dst.len() - len) as u32;
// connection status
self.ctype = if let Some(ct) = res.head().ctype() {
if ct == ConnectionType::KeepAlive {
self.ctype
} else {
ct
}
} else {
self.ctype
};
// encode message
let len = dst.len();
self.encoder.encode(
dst,
&mut res,
self.flags.contains(Flags::HEAD),
self.flags.contains(Flags::STREAM),
self.version,
length,
self.ctype,
&self.config,
)?;
self.headers_size = (dst.len() - len) as u32;
}
}
Message::Chunk(Some(bytes)) => {
self.encoder.encode_chunk(bytes.as_ref(), dst)?;

View File

@ -51,6 +51,8 @@ pub(crate) enum PayloadLength {
pub(crate) trait MessageType: Sized {
fn set_connection_type(&mut self, ctype: Option<ConnectionType>);
fn set_expect(&mut self);
fn headers_mut(&mut self) -> &mut HeaderMap;
fn decode(src: &mut BytesMut) -> Result<Option<(Self, PayloadType)>, ParseError>;
@ -62,6 +64,7 @@ pub(crate) trait MessageType: Sized {
) -> Result<PayloadLength, ParseError> {
let mut ka = None;
let mut has_upgrade = false;
let mut expect = false;
let mut chunked = false;
let mut content_length = None;
@ -126,6 +129,12 @@ pub(crate) trait MessageType: Sized {
}
}
}
header::EXPECT => {
let bytes = value.as_bytes();
if bytes.len() >= 4 && &bytes[0..4] == b"100-" {
expect = true;
}
}
_ => (),
}
@ -136,6 +145,9 @@ pub(crate) trait MessageType: Sized {
}
}
self.set_connection_type(ka);
if expect {
self.set_expect()
}
// https://tools.ietf.org/html/rfc7230#section-3.3.3
if chunked {
@ -163,6 +175,10 @@ impl MessageType for Request {
}
}
fn set_expect(&mut self) {
self.head_mut().set_expect();
}
fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.head_mut().headers
}
@ -235,6 +251,8 @@ impl MessageType for ResponseHead {
}
}
fn set_expect(&mut self) {}
fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.headers
}

View File

@ -1,5 +1,4 @@
use std::collections::VecDeque;
use std::fmt::Debug;
use std::mem;
use std::time::Instant;
@ -13,8 +12,9 @@ use tokio_timer::Delay;
use crate::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::config::ServiceConfig;
use crate::error::DispatchError;
use crate::error::{DispatchError, Error};
use crate::error::{ParseError, PayloadError};
use crate::http::StatusCode;
use crate::request::Request;
use crate::response::Response;
@ -37,24 +37,33 @@ bitflags! {
}
/// Dispatcher for HTTP/1.1 protocol
pub struct Dispatcher<T, S: Service<Request = Request>, B: MessageBody>
pub struct Dispatcher<T, S, B, X>
where
S::Error: Debug,
S: Service<Request = Request>,
S::Error: Into<Error>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
{
inner: Option<InnerDispatcher<T, S, B>>,
inner: Option<InnerDispatcher<T, S, B, X>>,
}
struct InnerDispatcher<T, S: Service<Request = Request>, B: MessageBody>
struct InnerDispatcher<T, S, B, X>
where
S::Error: Debug,
S: Service<Request = Request>,
S::Error: Into<Error>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
{
service: CloneableService<S>,
expect: CloneableService<X>,
flags: Flags,
framed: Framed<T, Codec>,
error: Option<DispatchError>,
config: ServiceConfig,
state: State<S, B>,
state: State<S, B, X>,
payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>,
@ -67,13 +76,24 @@ enum DispatcherMessage {
Error(Response<()>),
}
enum State<S: Service<Request = Request>, B: MessageBody> {
enum State<S, B, X>
where
S: Service<Request = Request>,
X: Service<Request = Request, Response = Request>,
B: MessageBody,
{
None,
ExpectCall(X::Future),
ServiceCall(S::Future),
SendPayload(ResponseBody<B>),
}
impl<S: Service<Request = Request>, B: MessageBody> State<S, B> {
impl<S, B, X> State<S, B, X>
where
S: Service<Request = Request>,
X: Service<Request = Request, Response = Request>,
B: MessageBody,
{
fn is_empty(&self) -> bool {
if let State::None = self {
true
@ -83,21 +103,29 @@ impl<S: Service<Request = Request>, B: MessageBody> State<S, B> {
}
}
impl<T, S, B> Dispatcher<T, S, B>
impl<T, S, B, X> Dispatcher<T, S, B, X>
where
T: AsyncRead + AsyncWrite,
S: Service<Request = Request>,
S::Error: Debug,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
{
/// Create http/1 dispatcher.
pub fn new(stream: T, config: ServiceConfig, service: CloneableService<S>) -> Self {
pub fn new(
stream: T,
config: ServiceConfig,
service: CloneableService<S>,
expect: CloneableService<X>,
) -> Self {
Dispatcher::with_timeout(
Framed::new(stream, Codec::new(config.clone())),
config,
None,
service,
expect,
)
}
@ -107,6 +135,7 @@ where
config: ServiceConfig,
timeout: Option<Delay>,
service: CloneableService<S>,
expect: CloneableService<X>,
) -> Self {
let keepalive = config.keep_alive_enabled();
let flags = if keepalive {
@ -132,6 +161,7 @@ where
error: None,
messages: VecDeque::new(),
service,
expect,
flags,
config,
ka_expire,
@ -141,13 +171,15 @@ where
}
}
impl<T, S, B> InnerDispatcher<T, S, B>
impl<T, S, B, X> InnerDispatcher<T, S, B, X>
where
T: AsyncRead + AsyncWrite,
S: Service<Request = Request>,
S::Error: Debug,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
{
fn can_read(&self) -> bool {
if self.flags.contains(Flags::DISCONNECTED) {
@ -195,7 +227,7 @@ where
&mut self,
message: Response<()>,
body: ResponseBody<B>,
) -> Result<State<S, B>, DispatchError> {
) -> Result<State<S, B, X>, DispatchError> {
self.framed
.force_send(Message::Item((message, body.length())))
.map_err(|err| {
@ -213,6 +245,15 @@ where
}
}
fn send_continue(&mut self) -> Result<(), DispatchError> {
self.framed
.force_send(Message::Item((
Response::empty(StatusCode::CONTINUE),
BodySize::Empty,
)))
.map_err(|err| DispatchError::Io(err))
}
fn poll_response(&mut self) -> Result<(), DispatchError> {
let mut retry = self.can_read();
loop {
@ -227,6 +268,22 @@ where
}
None => None,
},
State::ExpectCall(mut fut) => match fut.poll() {
Ok(Async::Ready(req)) => {
self.send_continue()?;
Some(State::ServiceCall(self.service.call(req)))
}
Ok(Async::NotReady) => {
self.state = State::ExpectCall(fut);
None
}
Err(e) => {
let e = e.into();
let res: Response = e.into();
let (res, body) = res.replace_body(());
Some(self.send_response(res, body.into_body())?)
}
},
State::ServiceCall(mut fut) => match fut.poll() {
Ok(Async::Ready(res)) => {
let (res, body) = res.into().replace_body(());
@ -289,7 +346,28 @@ where
Ok(())
}
fn handle_request(&mut self, req: Request) -> Result<State<S, B>, DispatchError> {
fn handle_request(&mut self, req: Request) -> Result<State<S, B, X>, DispatchError> {
// Handle `EXPECT: 100-Continue` header
let req = if req.head().expect() {
let mut task = self.expect.call(req);
match task.poll() {
Ok(Async::Ready(req)) => {
self.send_continue()?;
req
}
Ok(Async::NotReady) => return Ok(State::ExpectCall(task)),
Err(e) => {
let e = e.into();
let res: Response = e.into();
let (res, body) = res.replace_body(());
return self.send_response(res, body.into_body());
}
}
} else {
req
};
// Call service
let mut task = self.service.call(req);
match task.poll() {
Ok(Async::Ready(res)) => {
@ -329,10 +407,6 @@ where
req = req1;
self.payload = Some(ps);
}
//MessageType::Stream => {
// self.unhandled = Some(req);
// return Ok(updated);
//}
_ => (),
}
@ -482,13 +556,15 @@ where
}
}
impl<T, S, B> Future for Dispatcher<T, S, B>
impl<T, S, B, X> Future for Dispatcher<T, S, B, X>
where
T: AsyncRead + AsyncWrite,
S: Service<Request = Request>,
S::Error: Debug,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
{
type Item = ();
type Error = DispatchError;
@ -558,6 +634,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::h1::ExpectHandler;
struct Buffer {
buf: Bytes,
@ -620,6 +697,7 @@ mod tests {
CloneableService::new(
(|_| ok::<_, Error>(Response::Ok().finish())).into_service(),
),
CloneableService::new(ExpectHandler),
);
assert!(h1.poll().is_ok());
assert!(h1.poll().is_ok());

View File

@ -0,0 +1,36 @@
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Poll};
use crate::error::Error;
use crate::request::Request;
pub struct ExpectHandler;
impl NewService for ExpectHandler {
type Request = Request;
type Response = Request;
type Error = Error;
type Service = ExpectHandler;
type InitError = Error;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(ExpectHandler)
}
}
impl Service for ExpectHandler {
type Request = Request;
type Response = Request;
type Error = Error;
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Request) -> Self::Future {
ok(req)
}
}

View File

@ -6,12 +6,14 @@ mod codec;
mod decoder;
mod dispatcher;
mod encoder;
mod expect;
mod payload;
mod service;
pub use self::client::{ClientCodec, ClientPayloadCodec};
pub use self::codec::Codec;
pub use self::dispatcher::Dispatcher;
pub use self::expect::ExpectHandler;
pub use self::payload::{Payload, PayloadWriter};
pub use self::service::{H1Service, H1ServiceHandler, OneRequest};

View File

@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::fmt;
use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
@ -10,25 +10,27 @@ use futures::{try_ready, Async, Future, IntoFuture, Poll, Stream};
use crate::body::MessageBody;
use crate::config::{KeepAlive, ServiceConfig};
use crate::error::{DispatchError, ParseError};
use crate::error::{DispatchError, Error, ParseError};
use crate::request::Request;
use crate::response::Response;
use super::codec::Codec;
use super::dispatcher::Dispatcher;
use super::Message;
use super::{ExpectHandler, Message};
/// `NewService` implementation for HTTP1 transport
pub struct H1Service<T, P, S, B> {
pub struct H1Service<T, P, S, B, X = ExpectHandler> {
srv: S,
cfg: ServiceConfig,
expect: X,
_t: PhantomData<(T, P, B)>,
}
impl<T, P, S, B> H1Service<T, P, S, B>
where
S: NewService<SrvConfig, Request = Request>,
S::Error: Debug,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
{
@ -39,6 +41,7 @@ where
H1Service {
cfg,
srv: service.into_new_service(),
expect: ExpectHandler,
_t: PhantomData,
}
}
@ -51,29 +54,59 @@ where
H1Service {
cfg,
srv: service.into_new_service(),
expect: ExpectHandler,
_t: PhantomData,
}
}
}
impl<T, P, S, B> NewService<SrvConfig> for H1Service<T, P, S, B>
impl<T, P, S, B, X> H1Service<T, P, S, B, X>
where
S: NewService<SrvConfig, Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
B: MessageBody,
{
pub fn expect<U>(self, expect: U) -> H1Service<T, P, S, B, U>
where
U: NewService<Request = Request, Response = Request>,
U::Error: Into<Error>,
U::InitError: fmt::Debug,
{
H1Service {
expect,
cfg: self.cfg,
srv: self.srv,
_t: PhantomData,
}
}
}
impl<T, P, S, B, X> NewService<SrvConfig> for H1Service<T, P, S, B, X>
where
T: AsyncRead + AsyncWrite,
S: NewService<SrvConfig, Request = Request>,
S::Error: Debug,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
B: MessageBody,
X: NewService<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
{
type Request = Io<T, P>;
type Response = ();
type Error = DispatchError;
type InitError = S::InitError;
type Service = H1ServiceHandler<T, P, S::Service, B>;
type Future = H1ServiceResponse<T, P, S, B>;
type InitError = ();
type Service = H1ServiceHandler<T, P, S::Service, B, X::Service>;
type Future = H1ServiceResponse<T, P, S, B, X>;
fn new_service(&self, cfg: &SrvConfig) -> Self::Future {
H1ServiceResponse {
fut: self.srv.new_service(cfg).into_future(),
fut_ex: Some(self.expect.new_service(&())),
expect: None,
cfg: Some(self.cfg.clone()),
_t: PhantomData,
}
@ -81,77 +114,136 @@ where
}
#[doc(hidden)]
pub struct H1ServiceResponse<T, P, S: NewService<SrvConfig, Request = Request>, B> {
fut: <S::Future as IntoFuture>::Future,
pub struct H1ServiceResponse<T, P, S, B, X>
where
S: NewService<SrvConfig, Request = Request>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
X: NewService<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
{
fut: S::Future,
fut_ex: Option<X::Future>,
expect: Option<X::Service>,
cfg: Option<ServiceConfig>,
_t: PhantomData<(T, P, B)>,
}
impl<T, P, S, B> Future for H1ServiceResponse<T, P, S, B>
impl<T, P, S, B, X> Future for H1ServiceResponse<T, P, S, B, X>
where
T: AsyncRead + AsyncWrite,
S: NewService<SrvConfig, Request = Request>,
S::Error: Debug,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
B: MessageBody,
X: NewService<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
{
type Item = H1ServiceHandler<T, P, S::Service, B>;
type Error = S::InitError;
type Item = H1ServiceHandler<T, P, S::Service, B, X::Service>;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let service = try_ready!(self.fut.poll());
if let Some(ref mut fut) = self.fut_ex {
let expect = try_ready!(fut
.poll()
.map_err(|e| log::error!("Init http service error: {:?}", e)));
self.expect = Some(expect);
self.fut_ex.take();
}
let service = try_ready!(self
.fut
.poll()
.map_err(|e| log::error!("Init http service error: {:?}", e)));
Ok(Async::Ready(H1ServiceHandler::new(
self.cfg.take().unwrap(),
service,
self.expect.take().unwrap(),
)))
}
}
/// `Service` implementation for HTTP1 transport
pub struct H1ServiceHandler<T, P, S, B> {
pub struct H1ServiceHandler<T, P, S, B, X> {
srv: CloneableService<S>,
expect: CloneableService<X>,
cfg: ServiceConfig,
_t: PhantomData<(T, P, B)>,
}
impl<T, P, S, B> H1ServiceHandler<T, P, S, B>
impl<T, P, S, B, X> H1ServiceHandler<T, P, S, B, X>
where
S: Service<Request = Request>,
S::Error: Debug,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
{
fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler<T, P, S, B> {
fn new(cfg: ServiceConfig, srv: S, expect: X) -> H1ServiceHandler<T, P, S, B, X> {
H1ServiceHandler {
srv: CloneableService::new(srv),
expect: CloneableService::new(expect),
cfg,
_t: PhantomData,
}
}
}
impl<T, P, S, B> Service for H1ServiceHandler<T, P, S, B>
impl<T, P, S, B, X> Service for H1ServiceHandler<T, P, S, B, X>
where
T: AsyncRead + AsyncWrite,
S: Service<Request = Request>,
S::Error: Debug,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
{
type Request = Io<T, P>;
type Response = ();
type Error = DispatchError;
type Future = Dispatcher<T, S, B>;
type Future = Dispatcher<T, S, B, X>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.srv.poll_ready().map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service
})
let ready = self
.expect
.poll_ready()
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready();
let ready = self
.srv
.poll_ready()
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready;
if ready {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
fn call(&mut self, req: Self::Request) -> Self::Future {
Dispatcher::new(req.into_parts().0, self.cfg.clone(), self.srv.clone())
Dispatcher::new(
req.into_parts().0,
self.cfg.clone(),
self.srv.clone(),
self.expect.clone(),
)
}
}