diff --git a/src/server/settings.rs b/src/config.rs similarity index 89% rename from src/server/settings.rs rename to src/config.rs index b8b7e51f0..508cd5dde 100644 --- a/src/server/settings.rs +++ b/src/config.rs @@ -12,10 +12,10 @@ use time; use tokio_current_thread::spawn; use tokio_timer::{sleep, Delay}; -use super::message::{Request, RequestPool}; -use super::KeepAlive; use body::Body; use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool}; +use request::{Request, RequestPool}; +use server::KeepAlive; // "Sun, 06 Nov 1994 08:49:37 GMT".len() const DATE_VALUE_LENGTH: usize = 29; @@ -28,8 +28,6 @@ struct Inner { client_timeout: u64, client_shutdown: u64, ka_enabled: bool, - bytes: Rc, - messages: &'static RequestPool, date: UnsafeCell<(bool, Date)>, } @@ -60,8 +58,6 @@ impl ServiceConfig { ka_enabled, client_timeout, client_shutdown, - bytes: Rc::new(SharedBytesPool::new()), - messages: RequestPool::pool(), date: UnsafeCell::new((false, Date::new())), })) } @@ -83,23 +79,6 @@ impl ServiceConfig { self.0.ka_enabled } - pub(crate) fn get_bytes(&self) -> BytesMut { - self.0.bytes.get_bytes() - } - - pub(crate) fn release_bytes(&self, bytes: BytesMut) { - self.0.bytes.release_bytes(bytes) - } - - pub(crate) fn get_request(&self) -> Request { - RequestPool::get(self.0.messages) - } - - #[doc(hidden)] - pub fn request_pool(&self) -> &'static RequestPool { - self.0.messages - } - fn update_date(&self) { // Unsafe: WorkerSetting is !Sync and !Send unsafe { (*self.0.date.get()).0 = false }; @@ -341,31 +320,6 @@ impl fmt::Write for Date { } } -#[derive(Debug)] -pub(crate) struct SharedBytesPool(RefCell>); - -impl SharedBytesPool { - pub fn new() -> SharedBytesPool { - SharedBytesPool(RefCell::new(VecDeque::with_capacity(128))) - } - - pub fn get_bytes(&self) -> BytesMut { - if let Some(bytes) = self.0.borrow_mut().pop_front() { - bytes - } else { - BytesMut::new() - } - } - - pub fn release_bytes(&self, mut bytes: BytesMut) { - let v = &mut self.0.borrow_mut(); - if v.len() < 128 { - bytes.clear(); - v.push_front(bytes); - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/error.rs b/src/error.rs index ff2388de8..e39dea9b2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -377,62 +377,56 @@ impl ResponseError for cookie::ParseError { } } -/// A set of errors that can occur during parsing multipart streams -#[derive(Fail, Debug)] -pub enum MultipartError { - /// Content-Type header is not found - #[fail(display = "No Content-type header found")] - NoContentType, - /// Can not parse Content-Type header - #[fail(display = "Can not parse Content-Type header")] - ParseContentType, - /// Multipart boundary is not found - #[fail(display = "Multipart boundary is not found")] - Boundary, - /// Multipart stream is incomplete - #[fail(display = "Multipart stream is incomplete")] - Incomplete, - /// Error during field parsing - #[fail(display = "{}", _0)] - Parse(#[cause] ParseError), - /// Payload error - #[fail(display = "{}", _0)] - Payload(#[cause] PayloadError), +#[derive(Debug)] +/// A set of errors that can occur during dispatching http requests +pub enum DispatchError { + /// Service error + // #[fail(display = "Application specific error: {}", _0)] + Service(E), + + /// An `io::Error` that occurred while trying to read or write to a network + /// stream. + // #[fail(display = "IO error: {}", _0)] + Io(io::Error), + + /// Http request parse error. + // #[fail(display = "Parse error: {}", _0)] + Parse(ParseError), + + /// The first request did not complete within the specified timeout. + // #[fail(display = "The first request did not complete within the specified timeout")] + SlowRequestTimeout, + + /// Shutdown timeout + // #[fail(display = "Connection shutdown timeout")] + ShutdownTimeout, + + /// Payload is not consumed + // #[fail(display = "Task is completed but request's payload is not consumed")] + PayloadIsNotConsumed, + + /// Malformed request + // #[fail(display = "Malformed request")] + MalformedRequest, + + /// Internal error + // #[fail(display = "Internal error")] + InternalError, + + /// Unknown error + // #[fail(display = "Unknown error")] + Unknown, } -impl From for MultipartError { - fn from(err: ParseError) -> MultipartError { - MultipartError::Parse(err) +impl From for DispatchError { + fn from(err: ParseError) -> Self { + DispatchError::Parse(err) } } -impl From for MultipartError { - fn from(err: PayloadError) -> MultipartError { - MultipartError::Payload(err) - } -} - -/// Return `BadRequest` for `MultipartError` -impl ResponseError for MultipartError { - fn error_response(&self) -> HttpResponse { - HttpResponse::new(StatusCode::BAD_REQUEST) - } -} - -/// Error during handling `Expect` header -#[derive(Fail, PartialEq, Debug)] -pub enum ExpectError { - /// Expect header value can not be converted to utf8 - #[fail(display = "Expect header value can not be converted to utf8")] - Encoding, - /// Unknown expect value - #[fail(display = "Unknown expect value")] - UnknownExpect, -} - -impl ResponseError for ExpectError { - fn error_response(&self) -> HttpResponse { - HttpResponse::with_body(StatusCode::EXPECTATION_FAILED, "Unknown Expect") +impl From for DispatchError { + fn from(err: io::Error) -> Self { + DispatchError::Io(err) } } @@ -565,28 +559,6 @@ impl From for ReadlinesError { } } -/// Errors which can occur when attempting to interpret a segment string as a -/// valid path segment. -#[derive(Fail, Debug, PartialEq)] -pub enum UriSegmentError { - /// The segment started with the wrapped invalid character. - #[fail(display = "The segment started with the wrapped invalid character")] - BadStart(char), - /// The segment contained the wrapped invalid character. - #[fail(display = "The segment contained the wrapped invalid character")] - BadChar(char), - /// The segment ended with the wrapped invalid character. - #[fail(display = "The segment ended with the wrapped invalid character")] - BadEnd(char), -} - -/// Return `BadRequest` for `UriSegmentError` -impl ResponseError for UriSegmentError { - fn error_response(&self) -> HttpResponse { - HttpResponse::new(StatusCode::BAD_REQUEST) - } -} - /// Errors which can occur when attempting to generate resource uri. #[derive(Fail, Debug, PartialEq)] pub enum UrlGenerationError { @@ -610,24 +582,6 @@ impl From for UrlGenerationError { } } -/// Errors which can occur when serving static files. -#[derive(Fail, Debug, PartialEq)] -pub enum StaticFileError { - /// Path is not a directory - #[fail(display = "Path is not a directory. Unable to serve static files")] - IsNotDirectory, - /// Cannot render directory - #[fail(display = "Unable to render directory without index file")] - IsDirectory, -} - -/// Return `NotFound` for `StaticFileError` -impl ResponseError for StaticFileError { - fn error_response(&self) -> HttpResponse { - HttpResponse::new(StatusCode::NOT_FOUND) - } -} - /// Helper type that can wrap any error and generate custom response. /// /// In following example any `io::Error` will be converted into "BAD REQUEST" diff --git a/src/server/h1codec.rs b/src/h1/codec.rs similarity index 93% rename from src/server/h1codec.rs rename to src/h1/codec.rs index ea56110d3..011883575 100644 --- a/src/server/h1codec.rs +++ b/src/h1/codec.rs @@ -4,37 +4,45 @@ use std::io::{self, Write}; use bytes::{BufMut, Bytes, BytesMut}; use tokio_codec::{Decoder, Encoder}; -use super::h1decoder::{H1Decoder, Message}; -use super::helpers; -use super::message::RequestPool; -use super::output::{ResponseInfo, ResponseLength}; +use super::decoder::H1Decoder; +pub use super::decoder::InMessage; use body::Body; use error::ParseError; +use helpers; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use http::Version; use httpresponse::HttpResponse; +use request::RequestPool; +use server::output::{ResponseInfo, ResponseLength}; -pub(crate) enum OutMessage { +pub enum OutMessage { Response(HttpResponse), Payload(Bytes), } -pub(crate) struct H1Codec { +/// HTTP/1 Codec +pub struct Codec { decoder: H1Decoder, encoder: H1Writer, } -impl H1Codec { - pub fn new(pool: &'static RequestPool) -> Self { - H1Codec { +impl Codec { + /// Create HTTP/1 codec + pub fn new() -> Self { + Codec::with_pool(RequestPool::pool()) + } + + /// Create HTTP/1 codec with request's pool + pub(crate) fn with_pool(pool: &'static RequestPool) -> Self { + Codec { decoder: H1Decoder::new(pool), encoder: H1Writer::new(), } } } -impl Decoder for H1Codec { - type Item = Message; +impl Decoder for Codec { + type Item = InMessage; type Error = ParseError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { @@ -42,7 +50,7 @@ impl Decoder for H1Codec { } } -impl Encoder for H1Codec { +impl Encoder for Codec { type Item = OutMessage; type Error = io::Error; diff --git a/src/server/h1decoder.rs b/src/h1/decoder.rs similarity index 97% rename from src/server/h1decoder.rs rename to src/h1/decoder.rs index c6f0974aa..47cc5fdf1 100644 --- a/src/server/h1decoder.rs +++ b/src/h1/decoder.rs @@ -4,10 +4,10 @@ use bytes::{Bytes, BytesMut}; use futures::{Async, Poll}; use httparse; -use super::message::{MessageFlags, Request, RequestPool}; use error::ParseError; use http::header::{HeaderName, HeaderValue}; use http::{header, HttpTryFrom, Method, Uri, Version}; +use request::{MessageFlags, Request, RequestPool}; use uri::Url; const MAX_BUFFER_SIZE: usize = 131_072; @@ -19,7 +19,7 @@ pub(crate) struct H1Decoder { } #[derive(Debug)] -pub enum Message { +pub enum InMessage { Message(Request), MessageWithPayload(Request), Chunk(Bytes), @@ -34,14 +34,16 @@ impl H1Decoder { } } - pub fn decode(&mut self, src: &mut BytesMut) -> Result, ParseError> { + pub fn decode( + &mut self, src: &mut BytesMut, + ) -> Result, ParseError> { // read payload if self.decoder.is_some() { match self.decoder.as_mut().unwrap().decode(src)? { - Async::Ready(Some(bytes)) => return Ok(Some(Message::Chunk(bytes))), + Async::Ready(Some(bytes)) => return Ok(Some(InMessage::Chunk(bytes))), Async::Ready(None) => { self.decoder.take(); - return Ok(Some(Message::Eof)); + return Ok(Some(InMessage::Eof)); } Async::NotReady => return Ok(None), } @@ -51,9 +53,9 @@ impl H1Decoder { Async::Ready((msg, decoder)) => { self.decoder = decoder; if self.decoder.is_some() { - Ok(Some(Message::MessageWithPayload(msg))) + Ok(Some(InMessage::MessageWithPayload(msg))) } else { - Ok(Some(Message::Message(msg))) + Ok(Some(InMessage::Message(msg))) } } Async::NotReady => { diff --git a/src/server/h1disp.rs b/src/h1/dispatcher.rs similarity index 86% rename from src/server/h1disp.rs rename to src/h1/dispatcher.rs index b1c2c8a21..6e5672d35 100644 --- a/src/server/h1disp.rs +++ b/src/h1/dispatcher.rs @@ -9,21 +9,20 @@ use actix_net::service::Service; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use tokio_codec::Framed; // use tokio_current_thread::spawn; -use tokio_io::AsyncWrite; +use tokio_io::{AsyncRead, AsyncWrite}; // use tokio_timer::Delay; use error::{ParseError, PayloadError}; use payload::{Payload, PayloadStatus, PayloadWriter}; use body::Body; +use error::DispatchError; use httpresponse::HttpResponse; -use super::error::HttpDispatchError; -use super::h1codec::{H1Codec, OutMessage}; -use super::h1decoder::Message; -use super::input::PayloadType; -use super::message::{Request, RequestPool}; -use super::IoStream; +use request::{Request, RequestPool}; +use server::input::PayloadType; + +use super::codec::{Codec, InMessage, OutMessage}; const MAX_PIPELINED_MESSAGES: usize = 16; @@ -41,15 +40,14 @@ bitflags! { } /// Dispatcher for HTTP/1.1 protocol -pub struct Http1Dispatcher +pub struct Dispatcher where S::Error: Debug + Display, { service: S, flags: Flags, - addr: Option, - framed: Framed, - error: Option>, + framed: Framed, + error: Option>, state: State, payload: Option, @@ -74,26 +72,24 @@ impl State { } } -impl Http1Dispatcher +impl Dispatcher where - T: IoStream, + T: AsyncRead + AsyncWrite, S: Service, S::Error: Debug + Display, { - pub fn new(stream: T, pool: &'static RequestPool, service: S) -> Self { - let addr = stream.peer_addr(); + /// Create http/1 dispatcher. + pub fn new(stream: T, service: S) -> Self { let flags = Flags::FLUSHED; - let codec = H1Codec::new(pool); - let framed = Framed::new(stream, codec); + let framed = Framed::new(stream, Codec::new()); - Http1Dispatcher { + Dispatcher { payload: None, state: State::None, error: None, messages: VecDeque::new(), service, flags, - addr, framed, } } @@ -141,7 +137,7 @@ where } /// Flush stream - fn poll_flush(&mut self) -> Poll<(), HttpDispatchError> { + fn poll_flush(&mut self) -> Poll<(), DispatchError> { if self.flags.contains(Flags::STARTED) && !self.flags.contains(Flags::FLUSHED) { match self.framed.poll_complete() { Ok(Async::NotReady) => Ok(Async::NotReady), @@ -153,7 +149,7 @@ where Ok(Async::Ready(_)) => { // if payload is not consumed we can not use connection if self.payload.is_some() && self.state.is_empty() { - return Err(HttpDispatchError::PayloadIsNotConsumed); + return Err(DispatchError::PayloadIsNotConsumed); } self.flags.insert(Flags::FLUSHED); Ok(Async::Ready(())) @@ -164,7 +160,7 @@ where } } - pub(self) fn poll_handler(&mut self) -> Result<(), HttpDispatchError> { + pub(self) fn poll_handler(&mut self) -> Result<(), DispatchError> { self.poll_io()?; let mut retry = self.can_read(); @@ -185,7 +181,7 @@ where } } Ok(Async::NotReady) => Some(Ok(State::Response(task))), - Err(err) => Some(Err(HttpDispatchError::App(err))), + Err(err) => Some(Err(DispatchError::Service(err))), } } else { None @@ -207,7 +203,7 @@ where Err(err) => { // it is not possible to recover from error // during pipe handling, so just drop connection - Some(Err(HttpDispatchError::App(err))) + Some(Err(DispatchError::Service(err))) } } } @@ -222,7 +218,7 @@ where *item = Some(msg); return Ok(()); } - Err(err) => Some(Err(HttpDispatchError::Io(err))), + Err(err) => Some(Err(DispatchError::Io(err))), } } State::SendResponseWithPayload(ref mut item) => { @@ -236,7 +232,7 @@ where *item = Some((msg, body)); return Ok(()); } - Err(err) => Some(Err(HttpDispatchError::Io(err))), + Err(err) => Some(Err(DispatchError::Io(err))), } } }; @@ -263,14 +259,11 @@ where Ok(()) } - fn one_message(&mut self, msg: Message) -> Result<(), HttpDispatchError> { + fn one_message(&mut self, msg: InMessage) -> Result<(), DispatchError> { self.flags.insert(Flags::STARTED); match msg { - Message::Message(mut msg) => { - // set remote addr - msg.inner_mut().addr = self.addr; - + InMessage::Message(msg) => { // handle request early if self.state.is_empty() { let mut task = self.service.call(msg); @@ -287,17 +280,14 @@ where Err(err) => { error!("Unhandled application error: {}", err); self.client_disconnected(false); - return Err(HttpDispatchError::App(err)); + return Err(DispatchError::Service(err)); } } } else { self.messages.push_back(msg); } } - Message::MessageWithPayload(mut msg) => { - // set remote addr - msg.inner_mut().addr = self.addr; - + InMessage::MessageWithPayload(msg) => { // payload let (ps, pl) = Payload::new(false); *msg.inner.payload.borrow_mut() = Some(pl); @@ -305,24 +295,24 @@ where self.messages.push_back(msg); } - Message::Chunk(chunk) => { + InMessage::Chunk(chunk) => { if let Some(ref mut payload) = self.payload { payload.feed_data(chunk); } else { error!("Internal server error: unexpected payload chunk"); self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); // self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR); - self.error = Some(HttpDispatchError::InternalError); + self.error = Some(DispatchError::InternalError); } } - Message::Eof => { + InMessage::Eof => { if let Some(mut payload) = self.payload.take() { payload.feed_eof(); } else { error!("Internal server error: unexpected eof"); self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); // self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR); - self.error = Some(HttpDispatchError::InternalError); + self.error = Some(DispatchError::InternalError); } } } @@ -330,7 +320,7 @@ where Ok(()) } - pub(self) fn poll_io(&mut self) -> Result> { + pub(self) fn poll_io(&mut self) -> Result> { let mut updated = false; if self.messages.len() < MAX_PIPELINED_MESSAGES { @@ -359,7 +349,7 @@ where // Malformed requests should be responded with 400 // self.push_response_entry(StatusCode::BAD_REQUEST); self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); - self.error = Some(HttpDispatchError::MalformedRequest); + self.error = Some(DispatchError::MalformedRequest); break; } } @@ -370,14 +360,14 @@ where } } -impl Future for Http1Dispatcher +impl Future for Dispatcher where - T: IoStream, + T: AsyncRead + AsyncWrite, S: Service, S::Error: Debug + Display, { type Item = (); - type Error = HttpDispatchError; + type Error = DispatchError; #[inline] fn poll(&mut self) -> Poll<(), Self::Error> { diff --git a/src/h1/mod.rs b/src/h1/mod.rs new file mode 100644 index 000000000..245f2fc23 --- /dev/null +++ b/src/h1/mod.rs @@ -0,0 +1,9 @@ +//! HTTP/1 implementation +mod codec; +mod decoder; +mod dispatcher; +mod service; + +pub use self::codec::Codec; +pub use self::dispatcher::Dispatcher; +pub use self::service::{H1Service, H1ServiceHandler}; diff --git a/src/h1/service.rs b/src/h1/service.rs new file mode 100644 index 000000000..3017a3ef7 --- /dev/null +++ b/src/h1/service.rs @@ -0,0 +1,125 @@ +use std::fmt::{Debug, Display}; +use std::marker::PhantomData; +use std::time::Duration; + +use actix_net::service::{IntoNewService, NewService, Service}; +use futures::future::{ok, FutureResult}; +use futures::{Async, Future, Poll}; +use tokio_io::{AsyncRead, AsyncWrite}; + +use config::ServiceConfig; +use error::DispatchError; +use httpresponse::HttpResponse; +use request::Request; + +use super::dispatcher::Dispatcher; + +/// `NewService` implementation for HTTP1 transport +pub struct H1Service { + srv: S, + cfg: ServiceConfig, + _t: PhantomData, +} + +impl H1Service +where + S: NewService, +{ + /// Create new `HttpService` instance. + pub fn new>(cfg: ServiceConfig, service: F) -> Self { + H1Service { + cfg, + srv: service.into_new_service(), + _t: PhantomData, + } + } +} + +impl NewService for H1Service +where + T: AsyncRead + AsyncWrite, + S: NewService + Clone, + S::Service: Clone, + S::Error: Debug + Display, +{ + type Request = T; + type Response = (); + type Error = DispatchError; + type InitError = S::InitError; + type Service = H1ServiceHandler; + type Future = H1ServiceResponse; + + fn new_service(&self) -> Self::Future { + H1ServiceResponse { + fut: self.srv.new_service(), + cfg: Some(self.cfg.clone()), + _t: PhantomData, + } + } +} + +pub struct H1ServiceResponse { + fut: S::Future, + cfg: Option, + _t: PhantomData, +} + +impl Future for H1ServiceResponse +where + T: AsyncRead + AsyncWrite, + S: NewService, + S::Service: Clone, + S::Error: Debug + Display, +{ + type Item = H1ServiceHandler; + type Error = S::InitError; + + fn poll(&mut self) -> Poll { + let service = try_ready!(self.fut.poll()); + Ok(Async::Ready(H1ServiceHandler::new( + self.cfg.take().unwrap(), + service, + ))) + } +} + +/// `Service` implementation for HTTP1 transport +pub struct H1ServiceHandler { + srv: S, + cfg: ServiceConfig, + _t: PhantomData, +} + +impl H1ServiceHandler +where + S: Service + Clone, + S::Error: Debug + Display, +{ + fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler { + H1ServiceHandler { + srv, + cfg, + _t: PhantomData, + } + } +} + +impl Service for H1ServiceHandler +where + T: AsyncRead + AsyncWrite, + S: Service + Clone, + S::Error: Debug + Display, +{ + type Request = T; + type Response = (); + type Error = DispatchError; + type Future = Dispatcher; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.srv.poll_ready().map_err(|e| DispatchError::Service(e)) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + Dispatcher::new(req, self.srv.clone()) + } +} diff --git a/src/server/helpers.rs b/src/helpers.rs similarity index 100% rename from src/server/helpers.rs rename to src/helpers.rs diff --git a/src/lib.rs b/src/lib.rs index efd566187..ec86f0320 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -135,6 +135,7 @@ extern crate actix_net; extern crate serde_derive; mod body; +mod config; mod extensions; mod header; mod httpcodes; @@ -142,9 +143,12 @@ mod httpmessage; mod httpresponse; mod json; mod payload; +mod request; mod uri; pub mod error; +pub mod h1; +pub(crate) mod helpers; pub mod server; //pub mod test; //pub mod ws; @@ -152,10 +156,11 @@ pub use body::{Binary, Body}; pub use error::{Error, ResponseError, Result}; pub use extensions::Extensions; pub use httpmessage::HttpMessage; -//pub use httprequest::HttpRequest; pub use httpresponse::HttpResponse; pub use json::Json; -pub use server::Request; +pub use request::Request; + +pub use self::config::{ServiceConfig, ServiceConfigBuilder}; pub mod dev { //! The `actix-web` prelude for library developers diff --git a/src/server/message.rs b/src/request.rs similarity index 87% rename from src/server/message.rs rename to src/request.rs index c39302bab..a75fda3a0 100644 --- a/src/server/message.rs +++ b/src/request.rs @@ -30,7 +30,6 @@ pub(crate) struct InnerRequest { pub(crate) flags: Cell, pub(crate) headers: HeaderMap, pub(crate) extensions: RefCell, - pub(crate) addr: Option, pub(crate) payload: RefCell>, pub(crate) stream_extensions: Option>, pool: &'static RequestPool, @@ -65,8 +64,13 @@ impl HttpMessage for Request { } impl Request { - /// Create new RequestContext instance - pub(crate) fn new(pool: &'static RequestPool) -> Request { + /// Create new Request instance + pub fn new() -> Request { + Request::with_pool(RequestPool::pool()) + } + + /// Create new Request instance with pool + pub(crate) fn with_pool(pool: &'static RequestPool) -> Request { Request { inner: Rc::new(InnerRequest { pool, @@ -75,7 +79,6 @@ impl Request { version: Version::HTTP_11, headers: HeaderMap::with_capacity(16), flags: Cell::new(MessageFlags::empty()), - addr: None, payload: RefCell::new(None), extensions: RefCell::new(Extensions::new()), stream_extensions: None, @@ -134,14 +137,6 @@ impl Request { &mut self.inner_mut().headers } - /// Peer socket address - /// - /// Peer address is actual socket address, if proxy is used in front of - /// actix http server, then peer address would be address of this proxy. - pub fn peer_addr(&self) -> Option { - self.inner().addr - } - /// Checks if a connection should be kept alive. #[inline] pub fn keep_alive(&self) -> bool { @@ -170,12 +165,6 @@ impl Request { self.inner().method == Method::CONNECT } - /// Io stream extensions - #[inline] - pub fn stream_extensions(&self) -> Option<&Extensions> { - self.inner().stream_extensions.as_ref().map(|e| e.as_ref()) - } - pub(crate) fn clone(&self) -> Self { Request { inner: self.inner.clone(), @@ -213,7 +202,8 @@ impl fmt::Debug for Request { } } -pub struct RequestPool(RefCell>>); +/// Request's objects pool +pub(crate) struct RequestPool(RefCell>>); thread_local!(static POOL: &'static RequestPool = RequestPool::create()); @@ -223,16 +213,18 @@ impl RequestPool { Box::leak(Box::new(pool)) } - pub(crate) fn pool() -> &'static RequestPool { + /// Get default request's pool + pub fn pool() -> &'static RequestPool { POOL.with(|p| *p) } + /// Get Request object #[inline] pub fn get(pool: &'static RequestPool) -> Request { if let Some(msg) = pool.0.borrow_mut().pop_front() { Request { inner: msg } } else { - Request::new(pool) + Request::with_pool(pool) } } diff --git a/src/server/error.rs b/src/server/error.rs deleted file mode 100644 index 7d5c67d1e..000000000 --- a/src/server/error.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::fmt::{Debug, Display}; -use std::io; - -use futures::{Async, Poll}; - -use error::{Error, ParseError}; -use http::{StatusCode, Version}; - -/// Errors produced by `AcceptorError` service. -#[derive(Debug)] -pub enum AcceptorError { - /// The inner service error - Service(T), - - /// Io specific error - Io(io::Error), - - /// The request did not complete within the specified timeout. - Timeout, -} - -#[derive(Debug)] -/// A set of errors that can occur during dispatching http requests -pub enum HttpDispatchError { - /// Application error - // #[fail(display = "Application specific error: {}", _0)] - App(E), - - /// An `io::Error` that occurred while trying to read or write to a network - /// stream. - // #[fail(display = "IO error: {}", _0)] - Io(io::Error), - - /// Http request parse error. - // #[fail(display = "Parse error: {}", _0)] - Parse(ParseError), - - /// The first request did not complete within the specified timeout. - // #[fail(display = "The first request did not complete within the specified timeout")] - SlowRequestTimeout, - - /// Shutdown timeout - // #[fail(display = "Connection shutdown timeout")] - ShutdownTimeout, - - /// Payload is not consumed - // #[fail(display = "Task is completed but request's payload is not consumed")] - PayloadIsNotConsumed, - - /// Malformed request - // #[fail(display = "Malformed request")] - MalformedRequest, - - /// Internal error - // #[fail(display = "Internal error")] - InternalError, - - /// Unknown error - // #[fail(display = "Unknown error")] - Unknown, -} - -impl From for HttpDispatchError { - fn from(err: ParseError) -> Self { - HttpDispatchError::Parse(err) - } -} - -impl From for HttpDispatchError { - fn from(err: io::Error) -> Self { - HttpDispatchError::Io(err) - } -} diff --git a/src/server/h1.rs b/src/server/h1.rs deleted file mode 100644 index e2b4bf45e..000000000 --- a/src/server/h1.rs +++ /dev/null @@ -1,1289 +0,0 @@ -use std::collections::VecDeque; -use std::net::{Shutdown, SocketAddr}; -use std::time::{Duration, Instant}; - -use bytes::BytesMut; -use futures::{Async, Future, Poll}; -use tokio_current_thread::spawn; -use tokio_timer::Delay; - -use error::{Error, ParseError, PayloadError}; -use http::{StatusCode, Version}; -use payload::{Payload, PayloadStatus, PayloadWriter}; - -use super::error::{HttpDispatchError, ServerError}; -use super::h1decoder::{H1Decoder, Message}; -use super::h1writer::H1Writer; -use super::handler::{HttpHandler, HttpHandlerTask, HttpHandlerTaskFut}; -use super::input::PayloadType; -use super::message::Request; -use super::settings::ServiceConfig; -use super::{IoStream, Writer}; - -const MAX_PIPELINED_MESSAGES: usize = 16; - -bitflags! { - pub struct Flags: u8 { - const STARTED = 0b0000_0001; - const KEEPALIVE_ENABLED = 0b0000_0010; - const KEEPALIVE = 0b0000_0100; - const SHUTDOWN = 0b0000_1000; - const READ_DISCONNECTED = 0b0001_0000; - const WRITE_DISCONNECTED = 0b0010_0000; - const POLLED = 0b0100_0000; - const FLUSHED = 0b1000_0000; - } -} - -/// Dispatcher for HTTP/1.1 protocol -pub struct Http1Dispatcher { - flags: Flags, - settings: ServiceConfig, - addr: Option, - stream: H1Writer, - decoder: H1Decoder, - payload: Option, - buf: BytesMut, - tasks: VecDeque>, - error: Option>, - ka_expire: Instant, - ka_timer: Option, -} - -enum Entry { - Task(H::Task), - Error(Box), -} - -impl Entry { - fn into_task(self) -> H::Task { - match self { - Entry::Task(task) => task, - Entry::Error(_) => panic!(), - } - } - fn disconnected(&mut self) { - match *self { - Entry::Task(ref mut task) => task.disconnected(), - Entry::Error(ref mut task) => task.disconnected(), - } - } - fn poll_io(&mut self, io: &mut Writer) -> Poll { - match *self { - Entry::Task(ref mut task) => task.poll_io(io), - Entry::Error(ref mut task) => task.poll_io(io), - } - } - fn poll_completed(&mut self) -> Poll<(), Error> { - match *self { - Entry::Task(ref mut task) => task.poll_completed(), - Entry::Error(ref mut task) => task.poll_completed(), - } - } -} - -impl Http1Dispatcher -where - T: IoStream, - H: HttpHandler + 'static, -{ - pub fn new( - settings: ServiceConfig, stream: T, buf: BytesMut, is_eof: bool, - keepalive_timer: Option, - ) -> Self { - let addr = stream.peer_addr(); - let (ka_expire, ka_timer) = if let Some(delay) = keepalive_timer { - (delay.deadline(), Some(delay)) - } else if let Some(delay) = settings.keep_alive_timer() { - (delay.deadline(), Some(delay)) - } else { - (settings.now(), None) - }; - - let flags = if is_eof { - Flags::READ_DISCONNECTED | Flags::FLUSHED - } else if settings.keep_alive_enabled() { - Flags::KEEPALIVE | Flags::KEEPALIVE_ENABLED | Flags::FLUSHED - } else { - Flags::empty() - }; - - Http1Dispatcher { - stream: H1Writer::new(stream, settings.clone()), - decoder: H1Decoder::new(settings.request_pool()), - payload: None, - tasks: VecDeque::new(), - error: None, - flags, - addr, - buf, - settings, - ka_timer, - ka_expire, - } - } - - pub(crate) fn for_error( - settings: ServiceConfig, stream: T, status: StatusCode, - mut keepalive_timer: Option, buf: BytesMut, - ) -> Self { - if let Some(deadline) = settings.client_timer_expire() { - let _ = keepalive_timer.as_mut().map(|delay| delay.reset(deadline)); - } - - let mut disp = Http1Dispatcher { - flags: Flags::STARTED | Flags::READ_DISCONNECTED | Flags::FLUSHED, - stream: H1Writer::new(stream, settings.clone()), - decoder: H1Decoder::new(settings.request_pool()), - payload: None, - tasks: VecDeque::new(), - error: None, - addr: None, - ka_timer: keepalive_timer, - ka_expire: settings.now(), - buf, - settings, - }; - disp.push_response_entry(status); - disp - } - - #[inline] - pub fn settings(&self) -> &ServiceConfig { - &self.settings - } - - #[inline] - pub(crate) fn io(&mut self) -> &mut T { - self.stream.get_mut() - } - - #[inline] - fn can_read(&self) -> bool { - if self.flags.contains(Flags::READ_DISCONNECTED) { - return false; - } - - if let Some(ref info) = self.payload { - info.need_read() == PayloadStatus::Read - } else { - true - } - } - - // if checked is set to true, delay disconnect until all tasks have finished. - fn client_disconnected(&mut self, checked: bool) { - self.flags.insert(Flags::READ_DISCONNECTED); - if let Some(mut payload) = self.payload.take() { - payload.set_error(PayloadError::Incomplete); - } - - if !checked || self.tasks.is_empty() { - self.flags - .insert(Flags::WRITE_DISCONNECTED | Flags::FLUSHED); - self.stream.disconnected(); - - // notify all tasks - for mut task in self.tasks.drain(..) { - task.disconnected(); - match task.poll_completed() { - Ok(Async::NotReady) => { - // spawn not completed task, it does not require access to io - // at this point - spawn(HttpHandlerTaskFut::new(task.into_task())); - } - Ok(Async::Ready(_)) => (), - Err(err) => { - error!("Unhandled application error: {}", err); - } - } - } - } - } - - #[inline] - pub fn poll(&mut self) -> Poll<(), HttpDispatchError> { - // check connection keep-alive - self.poll_keep_alive()?; - - // shutdown - if self.flags.contains(Flags::SHUTDOWN) { - if self.flags.contains(Flags::WRITE_DISCONNECTED) { - return Ok(Async::Ready(())); - } - return self.poll_flush(true); - } - - // process incoming requests - if !self.flags.contains(Flags::WRITE_DISCONNECTED) { - self.poll_handler()?; - - // flush stream - self.poll_flush(false)?; - - // deal with keep-alive and stream eof (client-side write shutdown) - if self.tasks.is_empty() && self.flags.contains(Flags::FLUSHED) { - // handle stream eof - if self - .flags - .intersects(Flags::READ_DISCONNECTED | Flags::WRITE_DISCONNECTED) - { - return Ok(Async::Ready(())); - } - // no keep-alive - if self.flags.contains(Flags::STARTED) - && (!self.flags.contains(Flags::KEEPALIVE_ENABLED) - || !self.flags.contains(Flags::KEEPALIVE)) - { - self.flags.insert(Flags::SHUTDOWN); - return self.poll(); - } - } - Ok(Async::NotReady) - } else if let Some(err) = self.error.take() { - Err(err) - } else { - Ok(Async::Ready(())) - } - } - - /// Flush stream - fn poll_flush(&mut self, shutdown: bool) -> Poll<(), HttpDispatchError> { - if shutdown || self.flags.contains(Flags::STARTED) { - match self.stream.poll_completed(shutdown) { - Ok(Async::NotReady) => { - // mark stream - if !self.stream.flushed() { - self.flags.remove(Flags::FLUSHED); - } - Ok(Async::NotReady) - } - Err(err) => { - debug!("Error sending data: {}", err); - self.client_disconnected(false); - Err(err.into()) - } - Ok(Async::Ready(_)) => { - // if payload is not consumed we can not use connection - if self.payload.is_some() && self.tasks.is_empty() { - return Err(HttpDispatchError::PayloadIsNotConsumed); - } - self.flags.insert(Flags::FLUSHED); - Ok(Async::Ready(())) - } - } - } else { - Ok(Async::Ready(())) - } - } - - /// keep-alive timer. returns `true` is keep-alive, otherwise drop - fn poll_keep_alive(&mut self) -> Result<(), HttpDispatchError> { - if let Some(ref mut timer) = self.ka_timer { - match timer.poll() { - Ok(Async::Ready(_)) => { - if timer.deadline() >= self.ka_expire { - // check for any outstanding request handling - if self.tasks.is_empty() { - // if we get timer during shutdown, just drop connection - if self.flags.contains(Flags::SHUTDOWN) { - let io = self.stream.get_mut(); - let _ = IoStream::set_linger( - io, - Some(Duration::from_secs(0)), - ); - let _ = IoStream::shutdown(io, Shutdown::Both); - return Err(HttpDispatchError::ShutdownTimeout); - } else if !self.flags.contains(Flags::STARTED) { - // timeout on first request (slow request) return 408 - trace!("Slow request timeout"); - self.flags - .insert(Flags::STARTED | Flags::READ_DISCONNECTED); - self.tasks.push_back(Entry::Error(ServerError::err( - Version::HTTP_11, - StatusCode::REQUEST_TIMEOUT, - ))); - } else { - trace!("Keep-alive timeout, close connection"); - self.flags.insert(Flags::SHUTDOWN); - - // start shutdown timer - if let Some(deadline) = - self.settings.client_shutdown_timer() - { - timer.reset(deadline) - } else { - return Ok(()); - } - } - } else if let Some(deadline) = self.settings.keep_alive_expire() - { - timer.reset(deadline) - } - } else { - timer.reset(self.ka_expire) - } - } - Ok(Async::NotReady) => (), - Err(e) => { - error!("Timer error {:?}", e); - return Err(HttpDispatchError::Unknown); - } - } - } - - Ok(()) - } - - #[inline] - /// read data from the stream - pub(self) fn poll_io(&mut self) -> Result> { - if !self.flags.contains(Flags::POLLED) { - self.flags.insert(Flags::POLLED); - if !self.buf.is_empty() { - let updated = self.parse()?; - return Ok(updated); - } - } - - // read io from socket - let mut updated = false; - if self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES { - match self.stream.get_mut().read_available(&mut self.buf) { - Ok(Async::Ready((read_some, disconnected))) => { - if read_some && self.parse()? { - updated = true; - } - if disconnected { - self.client_disconnected(true); - } - } - Ok(Async::NotReady) => (), - Err(err) => { - self.client_disconnected(false); - return Err(err.into()); - } - } - } - Ok(updated) - } - - pub(self) fn poll_handler(&mut self) -> Result<(), HttpDispatchError> { - self.poll_io()?; - let mut retry = self.can_read(); - - // process first pipelined response, only first task can do io operation in http/1 - while !self.tasks.is_empty() { - match self.tasks[0].poll_io(&mut self.stream) { - Ok(Async::Ready(ready)) => { - // override keep-alive state - if self.stream.keepalive() { - self.flags.insert(Flags::KEEPALIVE); - } else { - self.flags.remove(Flags::KEEPALIVE); - } - // prepare stream for next response - self.stream.reset(); - - let task = self.tasks.pop_front().unwrap(); - if !ready { - // task is done with io operations but still needs to do more work - spawn(HttpHandlerTaskFut::new(task.into_task())); - } - } - Ok(Async::NotReady) => { - // check if we need timer - if self.ka_timer.is_some() && self.stream.upgrade() { - self.ka_timer.take(); - } - - // if read-backpressure is enabled and we consumed some data. - // we may read more dataand retry - if !retry && self.can_read() && self.poll_io()? { - retry = self.can_read(); - continue; - } - break; - } - Err(err) => { - error!("Unhandled error1: {}", err); - // it is not possible to recover from error - // during pipe handling, so just drop connection - self.client_disconnected(false); - return Err(HttpDispatchError::App(err)); - } - } - } - - // check in-flight messages. all tasks must be alive, - // they need to produce response. if app returned error - // and we can not continue processing incoming requests. - let mut idx = 1; - while idx < self.tasks.len() { - let stop = match self.tasks[idx].poll_completed() { - Ok(Async::NotReady) => false, - Ok(Async::Ready(_)) => true, - Err(err) => { - self.error = Some(HttpDispatchError::App(err)); - true - } - }; - if stop { - // error in task handling or task is completed, - // so no response for this task which means we can not read more requests - // because pipeline sequence is broken. - // but we can safely complete existing tasks - self.flags.insert(Flags::READ_DISCONNECTED); - - for mut task in self.tasks.drain(idx..) { - task.disconnected(); - match task.poll_completed() { - Ok(Async::NotReady) => { - // spawn not completed task, it does not require access to io - // at this point - spawn(HttpHandlerTaskFut::new(task.into_task())); - } - Ok(Async::Ready(_)) => (), - Err(err) => { - error!("Unhandled application error: {}", err); - } - } - } - break; - } else { - idx += 1; - } - } - - Ok(()) - } - - fn push_response_entry(&mut self, status: StatusCode) { - self.tasks - .push_back(Entry::Error(ServerError::err(Version::HTTP_11, status))); - } - - fn handle_message( - &mut self, mut msg: Request, payload: bool, - ) -> Result<(), HttpDispatchError> { - self.flags.insert(Flags::STARTED); - - if payload { - let (ps, pl) = Payload::new(false); - *msg.inner.payload.borrow_mut() = Some(pl); - self.payload = Some(PayloadType::new(&msg.inner.headers, ps)); - } - - // stream extensions - msg.inner_mut().stream_extensions = self.stream.get_mut().extensions(); - - // set remote addr - msg.inner_mut().addr = self.addr; - - // search handler for request - match self.settings.handler().handle(msg) { - Ok(mut task) => { - if self.tasks.is_empty() { - match task.poll_io(&mut self.stream) { - Ok(Async::Ready(ready)) => { - // override keep-alive state - if self.stream.keepalive() { - self.flags.insert(Flags::KEEPALIVE); - } else { - self.flags.remove(Flags::KEEPALIVE); - } - // prepare stream for next response - self.stream.reset(); - - if !ready { - // task is done with io operations - // but still needs to do more work - spawn(HttpHandlerTaskFut::new(task)); - } - } - Ok(Async::NotReady) => (), - Err(err) => { - error!("Unhandled error: {}", err); - self.client_disconnected(false); - return Err(HttpDispatchError::App(err)); - } - } - } else { - self.tasks.push_back(Entry::Task(task)); - } - } - Err(_) => { - // handler is not found - self.push_response_entry(StatusCode::NOT_FOUND); - } - } - Ok(()) - } - - pub(self) fn parse(&mut self) -> Result> { - let mut updated = false; - - 'outer: loop { - match self.decoder.decode(&mut self.buf) { - Ok(Some(Message::Message(msg))) => { - updated = true; - self.handle_message(msg, false)?; - } - Ok(Some(Message::MessageWithPayload(msg))) => { - updated = true; - self.handle_message(msg, true)?; - } - Ok(Some(Message::Chunk(chunk))) => { - updated = true; - if let Some(ref mut payload) = self.payload { - payload.feed_data(chunk); - } else { - error!("Internal server error: unexpected payload chunk"); - self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); - self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR); - self.error = Some(HttpDispatchError::InternalError); - break; - } - } - Ok(Some(Message::Eof)) => { - updated = true; - if let Some(mut payload) = self.payload.take() { - payload.feed_eof(); - } else { - error!("Internal server error: unexpected eof"); - self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); - self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR); - self.error = Some(HttpDispatchError::InternalError); - break; - } - } - Ok(None) => { - if self.flags.contains(Flags::READ_DISCONNECTED) { - self.client_disconnected(true); - } - break; - } - Err(e) => { - if let Some(mut payload) = self.payload.take() { - let e = match e { - ParseError::Io(e) => PayloadError::Io(e), - _ => PayloadError::EncodingCorrupted, - }; - payload.set_error(e); - } - - // Malformed requests should be responded with 400 - self.push_response_entry(StatusCode::BAD_REQUEST); - self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); - self.error = Some(HttpDispatchError::MalformedRequest); - break; - } - } - } - - if self.ka_timer.is_some() && updated { - if let Some(expire) = self.settings.keep_alive_expire() { - self.ka_expire = expire; - } - } - Ok(updated) - } -} - -#[cfg(test)] -mod tests { - use std::net::Shutdown; - use std::{cmp, io, time}; - - use actix::System; - use bytes::{Buf, Bytes, BytesMut}; - use futures::future; - use http::{Method, Version}; - use tokio_io::{AsyncRead, AsyncWrite}; - - use super::*; - use application::{App, HttpApplication}; - use error::ParseError; - use httpmessage::HttpMessage; - use server::h1decoder::Message; - use server::handler::IntoHttpHandler; - use server::settings::{ServerSettings, ServiceConfig}; - use server::{KeepAlive, Request}; - - fn wrk_settings() -> ServiceConfig { - ServiceConfig::::new( - App::new().into_handler(), - KeepAlive::Os, - 5000, - 2000, - ServerSettings::default(), - ) - } - - impl Message { - fn message(self) -> Request { - match self { - Message::Message(msg) => msg, - Message::MessageWithPayload(msg) => msg, - _ => panic!("error"), - } - } - fn is_payload(&self) -> bool { - match *self { - Message::MessageWithPayload(_) => true, - _ => panic!("error"), - } - } - fn chunk(self) -> Bytes { - match self { - Message::Chunk(chunk) => chunk, - _ => panic!("error"), - } - } - fn eof(&self) -> bool { - match *self { - Message::Eof => true, - _ => false, - } - } - } - - macro_rules! parse_ready { - ($e:expr) => {{ - let settings = wrk_settings(); - match H1Decoder::new(settings.request_pool()).decode($e) { - Ok(Some(msg)) => msg.message(), - Ok(_) => unreachable!("Eof during parsing http request"), - Err(err) => unreachable!("Error during parsing http request: {:?}", err), - } - }}; - } - - macro_rules! expect_parse_err { - ($e:expr) => {{ - let settings = wrk_settings(); - - match H1Decoder::new(settings.request_pool()).decode($e) { - Err(err) => match err { - ParseError::Io(_) => unreachable!("Parse error expected"), - _ => (), - }, - _ => unreachable!("Error expected"), - } - }}; - } - - struct Buffer { - buf: Bytes, - err: Option, - } - - impl Buffer { - fn new(data: &'static str) -> Buffer { - Buffer { - buf: Bytes::from(data), - err: None, - } - } - } - - impl AsyncRead for Buffer {} - impl io::Read for Buffer { - fn read(&mut self, dst: &mut [u8]) -> Result { - if self.buf.is_empty() { - if self.err.is_some() { - Err(self.err.take().unwrap()) - } else { - Err(io::Error::new(io::ErrorKind::WouldBlock, "")) - } - } else { - let size = cmp::min(self.buf.len(), dst.len()); - let b = self.buf.split_to(size); - dst[..size].copy_from_slice(&b); - Ok(size) - } - } - } - - impl IoStream for Buffer { - fn shutdown(&mut self, _: Shutdown) -> io::Result<()> { - Ok(()) - } - fn set_nodelay(&mut self, _: bool) -> io::Result<()> { - Ok(()) - } - fn set_linger(&mut self, _: Option) -> io::Result<()> { - Ok(()) - } - fn set_keepalive(&mut self, _: Option) -> io::Result<()> { - Ok(()) - } - } - impl io::Write for Buffer { - fn write(&mut self, buf: &[u8]) -> io::Result { - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } - } - impl AsyncWrite for Buffer { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(Async::Ready(())) - } - fn write_buf(&mut self, _: &mut B) -> Poll { - Ok(Async::NotReady) - } - } - - #[test] - fn test_req_parse_err() { - let mut sys = System::new("test"); - let _ = sys.block_on(future::lazy(|| { - let buf = Buffer::new("GET /test HTTP/1\r\n\r\n"); - let readbuf = BytesMut::new(); - let settings = wrk_settings(); - - let mut h1 = - Http1Dispatcher::new(settings.clone(), buf, readbuf, false, None); - assert!(h1.poll_io().is_ok()); - assert!(h1.poll_io().is_ok()); - assert!(h1.flags.contains(Flags::READ_DISCONNECTED)); - assert_eq!(h1.tasks.len(), 1); - future::ok::<_, ()>(()) - })); - } - - #[test] - fn test_parse() { - let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n\r\n"); - let settings = wrk_settings(); - - let mut reader = H1Decoder::new(settings.request_pool()); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::GET); - assert_eq!(req.path(), "/test"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } - } - - #[test] - fn test_parse_partial() { - let mut buf = BytesMut::from("PUT /test HTTP/1"); - let settings = wrk_settings(); - - let mut reader = H1Decoder::new(settings.request_pool()); - match reader.decode(&mut buf) { - Ok(None) => (), - _ => unreachable!("Error"), - } - - buf.extend(b".1\r\n\r\n"); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let mut req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::PUT); - assert_eq!(req.path(), "/test"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } - } - - #[test] - fn test_parse_post() { - let mut buf = BytesMut::from("POST /test2 HTTP/1.0\r\n\r\n"); - let settings = wrk_settings(); - - let mut reader = H1Decoder::new(settings.request_pool()); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let mut req = msg.message(); - assert_eq!(req.version(), Version::HTTP_10); - assert_eq!(*req.method(), Method::POST); - assert_eq!(req.path(), "/test2"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } - } - - #[test] - fn test_parse_body() { - let mut buf = - BytesMut::from("GET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); - let settings = wrk_settings(); - - let mut reader = H1Decoder::new(settings.request_pool()); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let mut req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::GET); - assert_eq!(req.path(), "/test"); - assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), - b"body" - ); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } - } - - #[test] - fn test_parse_body_crlf() { - let mut buf = - BytesMut::from("\r\nGET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); - let settings = wrk_settings(); - - let mut reader = H1Decoder::new(settings.request_pool()); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let mut req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::GET); - assert_eq!(req.path(), "/test"); - assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), - b"body" - ); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } - } - - #[test] - fn test_parse_partial_eof() { - let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n"); - let settings = wrk_settings(); - let mut reader = H1Decoder::new(settings.request_pool()); - assert!(reader.decode(&mut buf).unwrap().is_none()); - - buf.extend(b"\r\n"); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::GET); - assert_eq!(req.path(), "/test"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } - } - - #[test] - fn test_headers_split_field() { - let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n"); - let settings = wrk_settings(); - - let mut reader = H1Decoder::new(settings.request_pool()); - assert!{ reader.decode(&mut buf).unwrap().is_none() } - - buf.extend(b"t"); - assert!{ reader.decode(&mut buf).unwrap().is_none() } - - buf.extend(b"es"); - assert!{ reader.decode(&mut buf).unwrap().is_none() } - - buf.extend(b"t: value\r\n\r\n"); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::GET); - assert_eq!(req.path(), "/test"); - assert_eq!(req.headers().get("test").unwrap().as_bytes(), b"value"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } - } - - #[test] - fn test_headers_multi_value() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - Set-Cookie: c1=cookie1\r\n\ - Set-Cookie: c2=cookie2\r\n\r\n", - ); - let settings = wrk_settings(); - let mut reader = H1Decoder::new(settings.request_pool()); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - let req = msg.message(); - - let val: Vec<_> = req - .headers() - .get_all("Set-Cookie") - .iter() - .map(|v| v.to_str().unwrap().to_owned()) - .collect(); - assert_eq!(val[0], "c1=cookie1"); - assert_eq!(val[1], "c2=cookie2"); - } - - #[test] - fn test_conn_default_1_0() { - let mut buf = BytesMut::from("GET /test HTTP/1.0\r\n\r\n"); - let req = parse_ready!(&mut buf); - - assert!(!req.keep_alive()); - } - - #[test] - fn test_conn_default_1_1() { - let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n\r\n"); - let req = parse_ready!(&mut buf); - - assert!(req.keep_alive()); - } - - #[test] - fn test_conn_close() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - connection: close\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - assert!(!req.keep_alive()); - } - - #[test] - fn test_conn_close_1_0() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.0\r\n\ - connection: close\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - assert!(!req.keep_alive()); - } - - #[test] - fn test_conn_keep_alive_1_0() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.0\r\n\ - connection: keep-alive\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - assert!(req.keep_alive()); - } - - #[test] - fn test_conn_keep_alive_1_1() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - connection: keep-alive\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - assert!(req.keep_alive()); - } - - #[test] - fn test_conn_other_1_0() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.0\r\n\ - connection: other\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - assert!(!req.keep_alive()); - } - - #[test] - fn test_conn_other_1_1() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - connection: other\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - assert!(req.keep_alive()); - } - - #[test] - fn test_conn_upgrade() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - upgrade: websockets\r\n\ - connection: upgrade\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - assert!(req.upgrade()); - } - - #[test] - fn test_conn_upgrade_connect_method() { - let mut buf = BytesMut::from( - "CONNECT /test HTTP/1.1\r\n\ - content-type: text/plain\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - assert!(req.upgrade()); - } - - #[test] - fn test_request_chunked() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - transfer-encoding: chunked\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - if let Ok(val) = req.chunked() { - assert!(val); - } else { - unreachable!("Error"); - } - - // type in chunked - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - transfer-encoding: chnked\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - if let Ok(val) = req.chunked() { - assert!(!val); - } else { - unreachable!("Error"); - } - } - - #[test] - fn test_headers_content_length_err_1() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - content-length: line\r\n\r\n", - ); - - expect_parse_err!(&mut buf) - } - - #[test] - fn test_headers_content_length_err_2() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - content-length: -1\r\n\r\n", - ); - - expect_parse_err!(&mut buf); - } - - #[test] - fn test_invalid_header() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - test line\r\n\r\n", - ); - - expect_parse_err!(&mut buf); - } - - #[test] - fn test_invalid_name() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - test[]: line\r\n\r\n", - ); - - expect_parse_err!(&mut buf); - } - - #[test] - fn test_http_request_bad_status_line() { - let mut buf = BytesMut::from("getpath \r\n\r\n"); - expect_parse_err!(&mut buf); - } - - #[test] - fn test_http_request_upgrade() { - let settings = wrk_settings(); - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - connection: upgrade\r\n\ - upgrade: websocket\r\n\r\n\ - some raw data", - ); - let mut reader = H1Decoder::new(settings.request_pool()); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req = msg.message(); - assert!(!req.keep_alive()); - assert!(req.upgrade()); - assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), - b"some raw data" - ); - } - - #[test] - fn test_http_request_parser_utf8() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - x-test: тест\r\n\r\n", - ); - let req = parse_ready!(&mut buf); - - assert_eq!( - req.headers().get("x-test").unwrap().as_bytes(), - "тест".as_bytes() - ); - } - - #[test] - fn test_http_request_parser_two_slashes() { - let mut buf = BytesMut::from("GET //path HTTP/1.1\r\n\r\n"); - let req = parse_ready!(&mut buf); - - assert_eq!(req.path(), "//path"); - } - - #[test] - fn test_http_request_parser_bad_method() { - let mut buf = BytesMut::from("!12%()+=~$ /get HTTP/1.1\r\n\r\n"); - - expect_parse_err!(&mut buf); - } - - #[test] - fn test_http_request_parser_bad_version() { - let mut buf = BytesMut::from("GET //get HT/11\r\n\r\n"); - - expect_parse_err!(&mut buf); - } - - #[test] - fn test_http_request_chunked_payload() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - transfer-encoding: chunked\r\n\r\n", - ); - let settings = wrk_settings(); - let mut reader = H1Decoder::new(settings.request_pool()); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req = msg.message(); - assert!(req.chunked().unwrap()); - - buf.extend(b"4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); - assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), - b"data" - ); - assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), - b"line" - ); - assert!(reader.decode(&mut buf).unwrap().unwrap().eof()); - } - - #[test] - fn test_http_request_chunked_payload_and_next_message() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - transfer-encoding: chunked\r\n\r\n", - ); - let settings = wrk_settings(); - let mut reader = H1Decoder::new(settings.request_pool()); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req = msg.message(); - assert!(req.chunked().unwrap()); - - buf.extend( - b"4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n\ - POST /test2 HTTP/1.1\r\n\ - transfer-encoding: chunked\r\n\r\n" - .iter(), - ); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert_eq!(msg.chunk().as_ref(), b"data"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert_eq!(msg.chunk().as_ref(), b"line"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.eof()); - - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req2 = msg.message(); - assert!(req2.chunked().unwrap()); - assert_eq!(*req2.method(), Method::POST); - assert!(req2.chunked().unwrap()); - } - - #[test] - fn test_http_request_chunked_payload_chunks() { - let mut buf = BytesMut::from( - "GET /test HTTP/1.1\r\n\ - transfer-encoding: chunked\r\n\r\n", - ); - let settings = wrk_settings(); - - let mut reader = H1Decoder::new(settings.request_pool()); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req = msg.message(); - assert!(req.chunked().unwrap()); - - buf.extend(b"4\r\n1111\r\n"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert_eq!(msg.chunk().as_ref(), b"1111"); - - buf.extend(b"4\r\ndata\r"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert_eq!(msg.chunk().as_ref(), b"data"); - - buf.extend(b"\n4"); - assert!(reader.decode(&mut buf).unwrap().is_none()); - - buf.extend(b"\r"); - assert!(reader.decode(&mut buf).unwrap().is_none()); - buf.extend(b"\n"); - assert!(reader.decode(&mut buf).unwrap().is_none()); - - buf.extend(b"li"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert_eq!(msg.chunk().as_ref(), b"li"); - - //trailers - //buf.feed_data("test: test\r\n"); - //not_ready!(reader.parse(&mut buf, &mut readbuf)); - - buf.extend(b"ne\r\n0\r\n"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert_eq!(msg.chunk().as_ref(), b"ne"); - assert!(reader.decode(&mut buf).unwrap().is_none()); - - buf.extend(b"\r\n"); - assert!(reader.decode(&mut buf).unwrap().unwrap().eof()); - } - - #[test] - fn test_parse_chunked_payload_chunk_extension() { - let mut buf = BytesMut::from( - &"GET /test HTTP/1.1\r\n\ - transfer-encoding: chunked\r\n\r\n"[..], - ); - let settings = wrk_settings(); - - let mut reader = H1Decoder::new(settings.request_pool()); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - assert!(msg.message().chunked().unwrap()); - - buf.extend(b"4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n") - let chunk = reader.decode(&mut buf).unwrap().unwrap().chunk(); - assert_eq!(chunk, Bytes::from_static(b"data")); - let chunk = reader.decode(&mut buf).unwrap().unwrap().chunk(); - assert_eq!(chunk, Bytes::from_static(b"line")); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.eof()); - } -} diff --git a/src/server/mod.rs b/src/server/mod.rs index be172e646..068094c2a 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -117,30 +117,11 @@ use tokio_tcp::TcpStream; pub use actix_net::server::{PauseServer, ResumeServer, StopServer}; -mod error; -// pub(crate) mod h1; -#[doc(hidden)] -pub mod h1codec; -#[doc(hidden)] -pub mod h1decoder; -pub(crate) mod helpers; pub(crate) mod input; -pub(crate) mod message; pub(crate) mod output; -// pub(crate) mod service; -pub(crate) mod settings; - -pub use self::error::{AcceptorError, HttpDispatchError}; -pub use self::message::Request; #[doc(hidden)] -pub mod h1disp; - -#[doc(hidden)] -pub use self::settings::{ServiceConfig, ServiceConfigBuilder}; - -#[doc(hidden)] -pub use self::helpers::write_content_length; +pub use super::helpers::write_content_length; use body::Binary; use extensions::Extensions; diff --git a/src/server/output.rs b/src/server/output.rs index 143ba4029..f20bd3266 100644 --- a/src/server/output.rs +++ b/src/server/output.rs @@ -13,10 +13,10 @@ use flate2::Compression; use http::header::{HeaderValue, ACCEPT_ENCODING, CONTENT_LENGTH}; use http::{StatusCode, Version}; -use super::message::InnerRequest; use body::{Binary, Body}; use header::ContentEncoding; use httpresponse::HttpResponse; +use request::InnerRequest; // #[derive(Debug)] // pub(crate) struct RequestInfo { diff --git a/tests/test_h1v2.rs b/tests/test_h1v2.rs index 77b6d202f..e32481bc2 100644 --- a/tests/test_h1v2.rs +++ b/tests/test_h1v2.rs @@ -12,9 +12,8 @@ use actix_net::service::{IntoNewService, IntoService}; use actix_web::{client, test}; use futures::future; -use actix_http::server::h1disp::Http1Dispatcher; -use actix_http::server::{KeepAlive, ServiceConfig}; -use actix_http::{Error, HttpResponse}; +use actix_http::server::KeepAlive; +use actix_http::{h1, Error, HttpResponse, ServiceConfig}; #[test] fn test_h1_v2() { @@ -30,17 +29,10 @@ fn test_h1_v2() { .server_address(addr) .finish(); - (move |io| { - let pool = settings.request_pool(); - Http1Dispatcher::new( - io, - pool, - (|req| { - println!("REQ: {:?}", req); - future::ok::<_, Error>(HttpResponse::Ok().finish()) - }).into_service(), - ) - }).into_new_service() + h1::H1Service::new(settings, |req| { + println!("REQ: {:?}", req); + future::ok::<_, Error>(HttpResponse::Ok().finish()) + }) }).unwrap() .run(); });