1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

refactor types

This commit is contained in:
Nikolay Kim 2018-10-04 20:02:10 -07:00
parent b15b2dda22
commit 4ca711909b
15 changed files with 273 additions and 1623 deletions

View File

@ -12,10 +12,10 @@ use time;
use tokio_current_thread::spawn; use tokio_current_thread::spawn;
use tokio_timer::{sleep, Delay}; use tokio_timer::{sleep, Delay};
use super::message::{Request, RequestPool};
use super::KeepAlive;
use body::Body; use body::Body;
use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool}; use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool};
use request::{Request, RequestPool};
use server::KeepAlive;
// "Sun, 06 Nov 1994 08:49:37 GMT".len() // "Sun, 06 Nov 1994 08:49:37 GMT".len()
const DATE_VALUE_LENGTH: usize = 29; const DATE_VALUE_LENGTH: usize = 29;
@ -28,8 +28,6 @@ struct Inner {
client_timeout: u64, client_timeout: u64,
client_shutdown: u64, client_shutdown: u64,
ka_enabled: bool, ka_enabled: bool,
bytes: Rc<SharedBytesPool>,
messages: &'static RequestPool,
date: UnsafeCell<(bool, Date)>, date: UnsafeCell<(bool, Date)>,
} }
@ -60,8 +58,6 @@ impl ServiceConfig {
ka_enabled, ka_enabled,
client_timeout, client_timeout,
client_shutdown, client_shutdown,
bytes: Rc::new(SharedBytesPool::new()),
messages: RequestPool::pool(),
date: UnsafeCell::new((false, Date::new())), date: UnsafeCell::new((false, Date::new())),
})) }))
} }
@ -83,23 +79,6 @@ impl ServiceConfig {
self.0.ka_enabled self.0.ka_enabled
} }
pub(crate) fn get_bytes(&self) -> BytesMut {
self.0.bytes.get_bytes()
}
pub(crate) fn release_bytes(&self, bytes: BytesMut) {
self.0.bytes.release_bytes(bytes)
}
pub(crate) fn get_request(&self) -> Request {
RequestPool::get(self.0.messages)
}
#[doc(hidden)]
pub fn request_pool(&self) -> &'static RequestPool {
self.0.messages
}
fn update_date(&self) { fn update_date(&self) {
// Unsafe: WorkerSetting is !Sync and !Send // Unsafe: WorkerSetting is !Sync and !Send
unsafe { (*self.0.date.get()).0 = false }; unsafe { (*self.0.date.get()).0 = false };
@ -341,31 +320,6 @@ impl fmt::Write for Date {
} }
} }
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<BytesMut>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> BytesMut {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
BytesMut::new()
}
}
pub fn release_bytes(&self, mut bytes: BytesMut) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
bytes.clear();
v.push_front(bytes);
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -377,62 +377,56 @@ impl ResponseError for cookie::ParseError {
} }
} }
/// A set of errors that can occur during parsing multipart streams #[derive(Debug)]
#[derive(Fail, Debug)] /// A set of errors that can occur during dispatching http requests
pub enum MultipartError { pub enum DispatchError<E: fmt::Debug + fmt::Display> {
/// Content-Type header is not found /// Service error
#[fail(display = "No Content-type header found")] // #[fail(display = "Application specific error: {}", _0)]
NoContentType, Service(E),
/// Can not parse Content-Type header
#[fail(display = "Can not parse Content-Type header")] /// An `io::Error` that occurred while trying to read or write to a network
ParseContentType, /// stream.
/// Multipart boundary is not found // #[fail(display = "IO error: {}", _0)]
#[fail(display = "Multipart boundary is not found")] Io(io::Error),
Boundary,
/// Multipart stream is incomplete /// Http request parse error.
#[fail(display = "Multipart stream is incomplete")] // #[fail(display = "Parse error: {}", _0)]
Incomplete, Parse(ParseError),
/// Error during field parsing
#[fail(display = "{}", _0)] /// The first request did not complete within the specified timeout.
Parse(#[cause] ParseError), // #[fail(display = "The first request did not complete within the specified timeout")]
/// Payload error SlowRequestTimeout,
#[fail(display = "{}", _0)]
Payload(#[cause] PayloadError), /// Shutdown timeout
// #[fail(display = "Connection shutdown timeout")]
ShutdownTimeout,
/// Payload is not consumed
// #[fail(display = "Task is completed but request's payload is not consumed")]
PayloadIsNotConsumed,
/// Malformed request
// #[fail(display = "Malformed request")]
MalformedRequest,
/// Internal error
// #[fail(display = "Internal error")]
InternalError,
/// Unknown error
// #[fail(display = "Unknown error")]
Unknown,
} }
impl From<ParseError> for MultipartError { impl<E: fmt::Debug + fmt::Display> From<ParseError> for DispatchError<E> {
fn from(err: ParseError) -> MultipartError { fn from(err: ParseError) -> Self {
MultipartError::Parse(err) DispatchError::Parse(err)
} }
} }
impl From<PayloadError> for MultipartError { impl<E: fmt::Debug + fmt::Display> From<io::Error> for DispatchError<E> {
fn from(err: PayloadError) -> MultipartError { fn from(err: io::Error) -> Self {
MultipartError::Payload(err) DispatchError::Io(err)
}
}
/// Return `BadRequest` for `MultipartError`
impl ResponseError for MultipartError {
fn error_response(&self) -> HttpResponse {
HttpResponse::new(StatusCode::BAD_REQUEST)
}
}
/// Error during handling `Expect` header
#[derive(Fail, PartialEq, Debug)]
pub enum ExpectError {
/// Expect header value can not be converted to utf8
#[fail(display = "Expect header value can not be converted to utf8")]
Encoding,
/// Unknown expect value
#[fail(display = "Unknown expect value")]
UnknownExpect,
}
impl ResponseError for ExpectError {
fn error_response(&self) -> HttpResponse {
HttpResponse::with_body(StatusCode::EXPECTATION_FAILED, "Unknown Expect")
} }
} }
@ -565,28 +559,6 @@ impl From<ContentTypeError> for ReadlinesError {
} }
} }
/// Errors which can occur when attempting to interpret a segment string as a
/// valid path segment.
#[derive(Fail, Debug, PartialEq)]
pub enum UriSegmentError {
/// The segment started with the wrapped invalid character.
#[fail(display = "The segment started with the wrapped invalid character")]
BadStart(char),
/// The segment contained the wrapped invalid character.
#[fail(display = "The segment contained the wrapped invalid character")]
BadChar(char),
/// The segment ended with the wrapped invalid character.
#[fail(display = "The segment ended with the wrapped invalid character")]
BadEnd(char),
}
/// Return `BadRequest` for `UriSegmentError`
impl ResponseError for UriSegmentError {
fn error_response(&self) -> HttpResponse {
HttpResponse::new(StatusCode::BAD_REQUEST)
}
}
/// Errors which can occur when attempting to generate resource uri. /// Errors which can occur when attempting to generate resource uri.
#[derive(Fail, Debug, PartialEq)] #[derive(Fail, Debug, PartialEq)]
pub enum UrlGenerationError { pub enum UrlGenerationError {
@ -610,24 +582,6 @@ impl From<UrlParseError> for UrlGenerationError {
} }
} }
/// Errors which can occur when serving static files.
#[derive(Fail, Debug, PartialEq)]
pub enum StaticFileError {
/// Path is not a directory
#[fail(display = "Path is not a directory. Unable to serve static files")]
IsNotDirectory,
/// Cannot render directory
#[fail(display = "Unable to render directory without index file")]
IsDirectory,
}
/// Return `NotFound` for `StaticFileError`
impl ResponseError for StaticFileError {
fn error_response(&self) -> HttpResponse {
HttpResponse::new(StatusCode::NOT_FOUND)
}
}
/// Helper type that can wrap any error and generate custom response. /// Helper type that can wrap any error and generate custom response.
/// ///
/// In following example any `io::Error` will be converted into "BAD REQUEST" /// In following example any `io::Error` will be converted into "BAD REQUEST"

View File

@ -4,37 +4,45 @@ use std::io::{self, Write};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use tokio_codec::{Decoder, Encoder}; use tokio_codec::{Decoder, Encoder};
use super::h1decoder::{H1Decoder, Message}; use super::decoder::H1Decoder;
use super::helpers; pub use super::decoder::InMessage;
use super::message::RequestPool;
use super::output::{ResponseInfo, ResponseLength};
use body::Body; use body::Body;
use error::ParseError; use error::ParseError;
use helpers;
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
use http::Version; use http::Version;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use request::RequestPool;
use server::output::{ResponseInfo, ResponseLength};
pub(crate) enum OutMessage { pub enum OutMessage {
Response(HttpResponse), Response(HttpResponse),
Payload(Bytes), Payload(Bytes),
} }
pub(crate) struct H1Codec { /// HTTP/1 Codec
pub struct Codec {
decoder: H1Decoder, decoder: H1Decoder,
encoder: H1Writer, encoder: H1Writer,
} }
impl H1Codec { impl Codec {
pub fn new(pool: &'static RequestPool) -> Self { /// Create HTTP/1 codec
H1Codec { pub fn new() -> Self {
Codec::with_pool(RequestPool::pool())
}
/// Create HTTP/1 codec with request's pool
pub(crate) fn with_pool(pool: &'static RequestPool) -> Self {
Codec {
decoder: H1Decoder::new(pool), decoder: H1Decoder::new(pool),
encoder: H1Writer::new(), encoder: H1Writer::new(),
} }
} }
} }
impl Decoder for H1Codec { impl Decoder for Codec {
type Item = Message; type Item = InMessage;
type Error = ParseError; type Error = ParseError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
@ -42,7 +50,7 @@ impl Decoder for H1Codec {
} }
} }
impl Encoder for H1Codec { impl Encoder for Codec {
type Item = OutMessage; type Item = OutMessage;
type Error = io::Error; type Error = io::Error;

View File

@ -4,10 +4,10 @@ use bytes::{Bytes, BytesMut};
use futures::{Async, Poll}; use futures::{Async, Poll};
use httparse; use httparse;
use super::message::{MessageFlags, Request, RequestPool};
use error::ParseError; use error::ParseError;
use http::header::{HeaderName, HeaderValue}; use http::header::{HeaderName, HeaderValue};
use http::{header, HttpTryFrom, Method, Uri, Version}; use http::{header, HttpTryFrom, Method, Uri, Version};
use request::{MessageFlags, Request, RequestPool};
use uri::Url; use uri::Url;
const MAX_BUFFER_SIZE: usize = 131_072; const MAX_BUFFER_SIZE: usize = 131_072;
@ -19,7 +19,7 @@ pub(crate) struct H1Decoder {
} }
#[derive(Debug)] #[derive(Debug)]
pub enum Message { pub enum InMessage {
Message(Request), Message(Request),
MessageWithPayload(Request), MessageWithPayload(Request),
Chunk(Bytes), Chunk(Bytes),
@ -34,14 +34,16 @@ impl H1Decoder {
} }
} }
pub fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Message>, ParseError> { pub fn decode(
&mut self, src: &mut BytesMut,
) -> Result<Option<InMessage>, ParseError> {
// read payload // read payload
if self.decoder.is_some() { if self.decoder.is_some() {
match self.decoder.as_mut().unwrap().decode(src)? { match self.decoder.as_mut().unwrap().decode(src)? {
Async::Ready(Some(bytes)) => return Ok(Some(Message::Chunk(bytes))), Async::Ready(Some(bytes)) => return Ok(Some(InMessage::Chunk(bytes))),
Async::Ready(None) => { Async::Ready(None) => {
self.decoder.take(); self.decoder.take();
return Ok(Some(Message::Eof)); return Ok(Some(InMessage::Eof));
} }
Async::NotReady => return Ok(None), Async::NotReady => return Ok(None),
} }
@ -51,9 +53,9 @@ impl H1Decoder {
Async::Ready((msg, decoder)) => { Async::Ready((msg, decoder)) => {
self.decoder = decoder; self.decoder = decoder;
if self.decoder.is_some() { if self.decoder.is_some() {
Ok(Some(Message::MessageWithPayload(msg))) Ok(Some(InMessage::MessageWithPayload(msg)))
} else { } else {
Ok(Some(Message::Message(msg))) Ok(Some(InMessage::Message(msg)))
} }
} }
Async::NotReady => { Async::NotReady => {

View File

@ -9,21 +9,20 @@ use actix_net::service::Service;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use tokio_codec::Framed; use tokio_codec::Framed;
// use tokio_current_thread::spawn; // use tokio_current_thread::spawn;
use tokio_io::AsyncWrite; use tokio_io::{AsyncRead, AsyncWrite};
// use tokio_timer::Delay; // use tokio_timer::Delay;
use error::{ParseError, PayloadError}; use error::{ParseError, PayloadError};
use payload::{Payload, PayloadStatus, PayloadWriter}; use payload::{Payload, PayloadStatus, PayloadWriter};
use body::Body; use body::Body;
use error::DispatchError;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use super::error::HttpDispatchError; use request::{Request, RequestPool};
use super::h1codec::{H1Codec, OutMessage}; use server::input::PayloadType;
use super::h1decoder::Message;
use super::input::PayloadType; use super::codec::{Codec, InMessage, OutMessage};
use super::message::{Request, RequestPool};
use super::IoStream;
const MAX_PIPELINED_MESSAGES: usize = 16; const MAX_PIPELINED_MESSAGES: usize = 16;
@ -41,15 +40,14 @@ bitflags! {
} }
/// Dispatcher for HTTP/1.1 protocol /// Dispatcher for HTTP/1.1 protocol
pub struct Http1Dispatcher<T: IoStream, S: Service> pub struct Dispatcher<T, S: Service>
where where
S::Error: Debug + Display, S::Error: Debug + Display,
{ {
service: S, service: S,
flags: Flags, flags: Flags,
addr: Option<SocketAddr>, framed: Framed<T, Codec>,
framed: Framed<T, H1Codec>, error: Option<DispatchError<S::Error>>,
error: Option<HttpDispatchError<S::Error>>,
state: State<S>, state: State<S>,
payload: Option<PayloadType>, payload: Option<PayloadType>,
@ -74,26 +72,24 @@ impl<S: Service> State<S> {
} }
} }
impl<T, S> Http1Dispatcher<T, S> impl<T, S> Dispatcher<T, S>
where where
T: IoStream, T: AsyncRead + AsyncWrite,
S: Service<Request = Request, Response = HttpResponse>, S: Service<Request = Request, Response = HttpResponse>,
S::Error: Debug + Display, S::Error: Debug + Display,
{ {
pub fn new(stream: T, pool: &'static RequestPool, service: S) -> Self { /// Create http/1 dispatcher.
let addr = stream.peer_addr(); pub fn new(stream: T, service: S) -> Self {
let flags = Flags::FLUSHED; let flags = Flags::FLUSHED;
let codec = H1Codec::new(pool); let framed = Framed::new(stream, Codec::new());
let framed = Framed::new(stream, codec);
Http1Dispatcher { Dispatcher {
payload: None, payload: None,
state: State::None, state: State::None,
error: None, error: None,
messages: VecDeque::new(), messages: VecDeque::new(),
service, service,
flags, flags,
addr,
framed, framed,
} }
} }
@ -141,7 +137,7 @@ where
} }
/// Flush stream /// Flush stream
fn poll_flush(&mut self) -> Poll<(), HttpDispatchError<S::Error>> { fn poll_flush(&mut self) -> Poll<(), DispatchError<S::Error>> {
if self.flags.contains(Flags::STARTED) && !self.flags.contains(Flags::FLUSHED) { if self.flags.contains(Flags::STARTED) && !self.flags.contains(Flags::FLUSHED) {
match self.framed.poll_complete() { match self.framed.poll_complete() {
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
@ -153,7 +149,7 @@ where
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
// if payload is not consumed we can not use connection // if payload is not consumed we can not use connection
if self.payload.is_some() && self.state.is_empty() { if self.payload.is_some() && self.state.is_empty() {
return Err(HttpDispatchError::PayloadIsNotConsumed); return Err(DispatchError::PayloadIsNotConsumed);
} }
self.flags.insert(Flags::FLUSHED); self.flags.insert(Flags::FLUSHED);
Ok(Async::Ready(())) Ok(Async::Ready(()))
@ -164,7 +160,7 @@ where
} }
} }
pub(self) fn poll_handler(&mut self) -> Result<(), HttpDispatchError<S::Error>> { pub(self) fn poll_handler(&mut self) -> Result<(), DispatchError<S::Error>> {
self.poll_io()?; self.poll_io()?;
let mut retry = self.can_read(); let mut retry = self.can_read();
@ -185,7 +181,7 @@ where
} }
} }
Ok(Async::NotReady) => Some(Ok(State::Response(task))), Ok(Async::NotReady) => Some(Ok(State::Response(task))),
Err(err) => Some(Err(HttpDispatchError::App(err))), Err(err) => Some(Err(DispatchError::Service(err))),
} }
} else { } else {
None None
@ -207,7 +203,7 @@ where
Err(err) => { Err(err) => {
// it is not possible to recover from error // it is not possible to recover from error
// during pipe handling, so just drop connection // during pipe handling, so just drop connection
Some(Err(HttpDispatchError::App(err))) Some(Err(DispatchError::Service(err)))
} }
} }
} }
@ -222,7 +218,7 @@ where
*item = Some(msg); *item = Some(msg);
return Ok(()); return Ok(());
} }
Err(err) => Some(Err(HttpDispatchError::Io(err))), Err(err) => Some(Err(DispatchError::Io(err))),
} }
} }
State::SendResponseWithPayload(ref mut item) => { State::SendResponseWithPayload(ref mut item) => {
@ -236,7 +232,7 @@ where
*item = Some((msg, body)); *item = Some((msg, body));
return Ok(()); return Ok(());
} }
Err(err) => Some(Err(HttpDispatchError::Io(err))), Err(err) => Some(Err(DispatchError::Io(err))),
} }
} }
}; };
@ -263,14 +259,11 @@ where
Ok(()) Ok(())
} }
fn one_message(&mut self, msg: Message) -> Result<(), HttpDispatchError<S::Error>> { fn one_message(&mut self, msg: InMessage) -> Result<(), DispatchError<S::Error>> {
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
match msg { match msg {
Message::Message(mut msg) => { InMessage::Message(msg) => {
// set remote addr
msg.inner_mut().addr = self.addr;
// handle request early // handle request early
if self.state.is_empty() { if self.state.is_empty() {
let mut task = self.service.call(msg); let mut task = self.service.call(msg);
@ -287,17 +280,14 @@ where
Err(err) => { Err(err) => {
error!("Unhandled application error: {}", err); error!("Unhandled application error: {}", err);
self.client_disconnected(false); self.client_disconnected(false);
return Err(HttpDispatchError::App(err)); return Err(DispatchError::Service(err));
} }
} }
} else { } else {
self.messages.push_back(msg); self.messages.push_back(msg);
} }
} }
Message::MessageWithPayload(mut msg) => { InMessage::MessageWithPayload(msg) => {
// set remote addr
msg.inner_mut().addr = self.addr;
// payload // payload
let (ps, pl) = Payload::new(false); let (ps, pl) = Payload::new(false);
*msg.inner.payload.borrow_mut() = Some(pl); *msg.inner.payload.borrow_mut() = Some(pl);
@ -305,24 +295,24 @@ where
self.messages.push_back(msg); self.messages.push_back(msg);
} }
Message::Chunk(chunk) => { InMessage::Chunk(chunk) => {
if let Some(ref mut payload) = self.payload { if let Some(ref mut payload) = self.payload {
payload.feed_data(chunk); payload.feed_data(chunk);
} else { } else {
error!("Internal server error: unexpected payload chunk"); error!("Internal server error: unexpected payload chunk");
self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED);
// self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR); // self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR);
self.error = Some(HttpDispatchError::InternalError); self.error = Some(DispatchError::InternalError);
} }
} }
Message::Eof => { InMessage::Eof => {
if let Some(mut payload) = self.payload.take() { if let Some(mut payload) = self.payload.take() {
payload.feed_eof(); payload.feed_eof();
} else { } else {
error!("Internal server error: unexpected eof"); error!("Internal server error: unexpected eof");
self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED);
// self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR); // self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR);
self.error = Some(HttpDispatchError::InternalError); self.error = Some(DispatchError::InternalError);
} }
} }
} }
@ -330,7 +320,7 @@ where
Ok(()) Ok(())
} }
pub(self) fn poll_io(&mut self) -> Result<bool, HttpDispatchError<S::Error>> { pub(self) fn poll_io(&mut self) -> Result<bool, DispatchError<S::Error>> {
let mut updated = false; let mut updated = false;
if self.messages.len() < MAX_PIPELINED_MESSAGES { if self.messages.len() < MAX_PIPELINED_MESSAGES {
@ -359,7 +349,7 @@ where
// Malformed requests should be responded with 400 // Malformed requests should be responded with 400
// self.push_response_entry(StatusCode::BAD_REQUEST); // self.push_response_entry(StatusCode::BAD_REQUEST);
self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED);
self.error = Some(HttpDispatchError::MalformedRequest); self.error = Some(DispatchError::MalformedRequest);
break; break;
} }
} }
@ -370,14 +360,14 @@ where
} }
} }
impl<T, S> Future for Http1Dispatcher<T, S> impl<T, S> Future for Dispatcher<T, S>
where where
T: IoStream, T: AsyncRead + AsyncWrite,
S: Service<Request = Request, Response = HttpResponse>, S: Service<Request = Request, Response = HttpResponse>,
S::Error: Debug + Display, S::Error: Debug + Display,
{ {
type Item = (); type Item = ();
type Error = HttpDispatchError<S::Error>; type Error = DispatchError<S::Error>;
#[inline] #[inline]
fn poll(&mut self) -> Poll<(), Self::Error> { fn poll(&mut self) -> Poll<(), Self::Error> {

9
src/h1/mod.rs Normal file
View File

@ -0,0 +1,9 @@
//! HTTP/1 implementation
mod codec;
mod decoder;
mod dispatcher;
mod service;
pub use self::codec::Codec;
pub use self::dispatcher::Dispatcher;
pub use self::service::{H1Service, H1ServiceHandler};

125
src/h1/service.rs Normal file
View File

@ -0,0 +1,125 @@
use std::fmt::{Debug, Display};
use std::marker::PhantomData;
use std::time::Duration;
use actix_net::service::{IntoNewService, NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use config::ServiceConfig;
use error::DispatchError;
use httpresponse::HttpResponse;
use request::Request;
use super::dispatcher::Dispatcher;
/// `NewService` implementation for HTTP1 transport
pub struct H1Service<T, S> {
srv: S,
cfg: ServiceConfig,
_t: PhantomData<T>,
}
impl<T, S> H1Service<T, S>
where
S: NewService,
{
/// Create new `HttpService` instance.
pub fn new<F: IntoNewService<S>>(cfg: ServiceConfig, service: F) -> Self {
H1Service {
cfg,
srv: service.into_new_service(),
_t: PhantomData,
}
}
}
impl<T, S> NewService for H1Service<T, S>
where
T: AsyncRead + AsyncWrite,
S: NewService<Request = Request, Response = HttpResponse> + Clone,
S::Service: Clone,
S::Error: Debug + Display,
{
type Request = T;
type Response = ();
type Error = DispatchError<S::Error>;
type InitError = S::InitError;
type Service = H1ServiceHandler<T, S::Service>;
type Future = H1ServiceResponse<T, S>;
fn new_service(&self) -> Self::Future {
H1ServiceResponse {
fut: self.srv.new_service(),
cfg: Some(self.cfg.clone()),
_t: PhantomData,
}
}
}
pub struct H1ServiceResponse<T, S: NewService> {
fut: S::Future,
cfg: Option<ServiceConfig>,
_t: PhantomData<T>,
}
impl<T, S> Future for H1ServiceResponse<T, S>
where
T: AsyncRead + AsyncWrite,
S: NewService<Request = Request, Response = HttpResponse>,
S::Service: Clone,
S::Error: Debug + Display,
{
type Item = H1ServiceHandler<T, S::Service>;
type Error = S::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let service = try_ready!(self.fut.poll());
Ok(Async::Ready(H1ServiceHandler::new(
self.cfg.take().unwrap(),
service,
)))
}
}
/// `Service` implementation for HTTP1 transport
pub struct H1ServiceHandler<T, S> {
srv: S,
cfg: ServiceConfig,
_t: PhantomData<T>,
}
impl<T, S> H1ServiceHandler<T, S>
where
S: Service<Request = Request, Response = HttpResponse> + Clone,
S::Error: Debug + Display,
{
fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler<T, S> {
H1ServiceHandler {
srv,
cfg,
_t: PhantomData,
}
}
}
impl<T, S> Service for H1ServiceHandler<T, S>
where
T: AsyncRead + AsyncWrite,
S: Service<Request = Request, Response = HttpResponse> + Clone,
S::Error: Debug + Display,
{
type Request = T;
type Response = ();
type Error = DispatchError<S::Error>;
type Future = Dispatcher<T, S>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.srv.poll_ready().map_err(|e| DispatchError::Service(e))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
Dispatcher::new(req, self.srv.clone())
}
}

View File

@ -135,6 +135,7 @@ extern crate actix_net;
extern crate serde_derive; extern crate serde_derive;
mod body; mod body;
mod config;
mod extensions; mod extensions;
mod header; mod header;
mod httpcodes; mod httpcodes;
@ -142,9 +143,12 @@ mod httpmessage;
mod httpresponse; mod httpresponse;
mod json; mod json;
mod payload; mod payload;
mod request;
mod uri; mod uri;
pub mod error; pub mod error;
pub mod h1;
pub(crate) mod helpers;
pub mod server; pub mod server;
//pub mod test; //pub mod test;
//pub mod ws; //pub mod ws;
@ -152,10 +156,11 @@ pub use body::{Binary, Body};
pub use error::{Error, ResponseError, Result}; pub use error::{Error, ResponseError, Result};
pub use extensions::Extensions; pub use extensions::Extensions;
pub use httpmessage::HttpMessage; pub use httpmessage::HttpMessage;
//pub use httprequest::HttpRequest;
pub use httpresponse::HttpResponse; pub use httpresponse::HttpResponse;
pub use json::Json; pub use json::Json;
pub use server::Request; pub use request::Request;
pub use self::config::{ServiceConfig, ServiceConfigBuilder};
pub mod dev { pub mod dev {
//! The `actix-web` prelude for library developers //! The `actix-web` prelude for library developers

View File

@ -30,7 +30,6 @@ pub(crate) struct InnerRequest {
pub(crate) flags: Cell<MessageFlags>, pub(crate) flags: Cell<MessageFlags>,
pub(crate) headers: HeaderMap, pub(crate) headers: HeaderMap,
pub(crate) extensions: RefCell<Extensions>, pub(crate) extensions: RefCell<Extensions>,
pub(crate) addr: Option<SocketAddr>,
pub(crate) payload: RefCell<Option<Payload>>, pub(crate) payload: RefCell<Option<Payload>>,
pub(crate) stream_extensions: Option<Rc<Extensions>>, pub(crate) stream_extensions: Option<Rc<Extensions>>,
pool: &'static RequestPool, pool: &'static RequestPool,
@ -65,8 +64,13 @@ impl HttpMessage for Request {
} }
impl Request { impl Request {
/// Create new RequestContext instance /// Create new Request instance
pub(crate) fn new(pool: &'static RequestPool) -> Request { pub fn new() -> Request {
Request::with_pool(RequestPool::pool())
}
/// Create new Request instance with pool
pub(crate) fn with_pool(pool: &'static RequestPool) -> Request {
Request { Request {
inner: Rc::new(InnerRequest { inner: Rc::new(InnerRequest {
pool, pool,
@ -75,7 +79,6 @@ impl Request {
version: Version::HTTP_11, version: Version::HTTP_11,
headers: HeaderMap::with_capacity(16), headers: HeaderMap::with_capacity(16),
flags: Cell::new(MessageFlags::empty()), flags: Cell::new(MessageFlags::empty()),
addr: None,
payload: RefCell::new(None), payload: RefCell::new(None),
extensions: RefCell::new(Extensions::new()), extensions: RefCell::new(Extensions::new()),
stream_extensions: None, stream_extensions: None,
@ -134,14 +137,6 @@ impl Request {
&mut self.inner_mut().headers &mut self.inner_mut().headers
} }
/// Peer socket address
///
/// Peer address is actual socket address, if proxy is used in front of
/// actix http server, then peer address would be address of this proxy.
pub fn peer_addr(&self) -> Option<SocketAddr> {
self.inner().addr
}
/// Checks if a connection should be kept alive. /// Checks if a connection should be kept alive.
#[inline] #[inline]
pub fn keep_alive(&self) -> bool { pub fn keep_alive(&self) -> bool {
@ -170,12 +165,6 @@ impl Request {
self.inner().method == Method::CONNECT self.inner().method == Method::CONNECT
} }
/// Io stream extensions
#[inline]
pub fn stream_extensions(&self) -> Option<&Extensions> {
self.inner().stream_extensions.as_ref().map(|e| e.as_ref())
}
pub(crate) fn clone(&self) -> Self { pub(crate) fn clone(&self) -> Self {
Request { Request {
inner: self.inner.clone(), inner: self.inner.clone(),
@ -213,7 +202,8 @@ impl fmt::Debug for Request {
} }
} }
pub struct RequestPool(RefCell<VecDeque<Rc<InnerRequest>>>); /// Request's objects pool
pub(crate) struct RequestPool(RefCell<VecDeque<Rc<InnerRequest>>>);
thread_local!(static POOL: &'static RequestPool = RequestPool::create()); thread_local!(static POOL: &'static RequestPool = RequestPool::create());
@ -223,16 +213,18 @@ impl RequestPool {
Box::leak(Box::new(pool)) Box::leak(Box::new(pool))
} }
pub(crate) fn pool() -> &'static RequestPool { /// Get default request's pool
pub fn pool() -> &'static RequestPool {
POOL.with(|p| *p) POOL.with(|p| *p)
} }
/// Get Request object
#[inline] #[inline]
pub fn get(pool: &'static RequestPool) -> Request { pub fn get(pool: &'static RequestPool) -> Request {
if let Some(msg) = pool.0.borrow_mut().pop_front() { if let Some(msg) = pool.0.borrow_mut().pop_front() {
Request { inner: msg } Request { inner: msg }
} else { } else {
Request::new(pool) Request::with_pool(pool)
} }
} }

View File

@ -1,73 +0,0 @@
use std::fmt::{Debug, Display};
use std::io;
use futures::{Async, Poll};
use error::{Error, ParseError};
use http::{StatusCode, Version};
/// Errors produced by `AcceptorError` service.
#[derive(Debug)]
pub enum AcceptorError<T> {
/// The inner service error
Service(T),
/// Io specific error
Io(io::Error),
/// The request did not complete within the specified timeout.
Timeout,
}
#[derive(Debug)]
/// A set of errors that can occur during dispatching http requests
pub enum HttpDispatchError<E: Debug + Display> {
/// Application error
// #[fail(display = "Application specific error: {}", _0)]
App(E),
/// An `io::Error` that occurred while trying to read or write to a network
/// stream.
// #[fail(display = "IO error: {}", _0)]
Io(io::Error),
/// Http request parse error.
// #[fail(display = "Parse error: {}", _0)]
Parse(ParseError),
/// The first request did not complete within the specified timeout.
// #[fail(display = "The first request did not complete within the specified timeout")]
SlowRequestTimeout,
/// Shutdown timeout
// #[fail(display = "Connection shutdown timeout")]
ShutdownTimeout,
/// Payload is not consumed
// #[fail(display = "Task is completed but request's payload is not consumed")]
PayloadIsNotConsumed,
/// Malformed request
// #[fail(display = "Malformed request")]
MalformedRequest,
/// Internal error
// #[fail(display = "Internal error")]
InternalError,
/// Unknown error
// #[fail(display = "Unknown error")]
Unknown,
}
impl<E: Debug + Display> From<ParseError> for HttpDispatchError<E> {
fn from(err: ParseError) -> Self {
HttpDispatchError::Parse(err)
}
}
impl<E: Debug + Display> From<io::Error> for HttpDispatchError<E> {
fn from(err: io::Error) -> Self {
HttpDispatchError::Io(err)
}
}

File diff suppressed because it is too large Load Diff

View File

@ -117,30 +117,11 @@ use tokio_tcp::TcpStream;
pub use actix_net::server::{PauseServer, ResumeServer, StopServer}; pub use actix_net::server::{PauseServer, ResumeServer, StopServer};
mod error;
// pub(crate) mod h1;
#[doc(hidden)]
pub mod h1codec;
#[doc(hidden)]
pub mod h1decoder;
pub(crate) mod helpers;
pub(crate) mod input; pub(crate) mod input;
pub(crate) mod message;
pub(crate) mod output; pub(crate) mod output;
// pub(crate) mod service;
pub(crate) mod settings;
pub use self::error::{AcceptorError, HttpDispatchError};
pub use self::message::Request;
#[doc(hidden)] #[doc(hidden)]
pub mod h1disp; pub use super::helpers::write_content_length;
#[doc(hidden)]
pub use self::settings::{ServiceConfig, ServiceConfigBuilder};
#[doc(hidden)]
pub use self::helpers::write_content_length;
use body::Binary; use body::Binary;
use extensions::Extensions; use extensions::Extensions;

View File

@ -13,10 +13,10 @@ use flate2::Compression;
use http::header::{HeaderValue, ACCEPT_ENCODING, CONTENT_LENGTH}; use http::header::{HeaderValue, ACCEPT_ENCODING, CONTENT_LENGTH};
use http::{StatusCode, Version}; use http::{StatusCode, Version};
use super::message::InnerRequest;
use body::{Binary, Body}; use body::{Binary, Body};
use header::ContentEncoding; use header::ContentEncoding;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use request::InnerRequest;
// #[derive(Debug)] // #[derive(Debug)]
// pub(crate) struct RequestInfo { // pub(crate) struct RequestInfo {

View File

@ -12,9 +12,8 @@ use actix_net::service::{IntoNewService, IntoService};
use actix_web::{client, test}; use actix_web::{client, test};
use futures::future; use futures::future;
use actix_http::server::h1disp::Http1Dispatcher; use actix_http::server::KeepAlive;
use actix_http::server::{KeepAlive, ServiceConfig}; use actix_http::{h1, Error, HttpResponse, ServiceConfig};
use actix_http::{Error, HttpResponse};
#[test] #[test]
fn test_h1_v2() { fn test_h1_v2() {
@ -30,17 +29,10 @@ fn test_h1_v2() {
.server_address(addr) .server_address(addr)
.finish(); .finish();
(move |io| { h1::H1Service::new(settings, |req| {
let pool = settings.request_pool(); println!("REQ: {:?}", req);
Http1Dispatcher::new( future::ok::<_, Error>(HttpResponse::Ok().finish())
io, })
pool,
(|req| {
println!("REQ: {:?}", req);
future::ok::<_, Error>(HttpResponse::Ok().finish())
}).into_service(),
)
}).into_new_service()
}).unwrap() }).unwrap()
.run(); .run();
}); });