diff --git a/src/h1/mod.rs b/src/h1/mod.rs index 266ebf39c..634136a47 100644 --- a/src/h1/mod.rs +++ b/src/h1/mod.rs @@ -6,5 +6,6 @@ mod encoder; mod service; pub use self::codec::{Codec, InMessage, OutMessage}; +pub use self::decoder::{PayloadDecoder, RequestDecoder}; pub use self::dispatcher::Dispatcher; -pub use self::service::{H1Service, H1ServiceHandler, TakeRequest}; +pub use self::service::{H1Service, H1ServiceHandler}; diff --git a/src/h1/service.rs b/src/h1/service.rs index de24f52c1..a7261df17 100644 --- a/src/h1/service.rs +++ b/src/h1/service.rs @@ -2,17 +2,15 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::net; -use actix_net::codec::Framed; use actix_net::service::{IntoNewService, NewService, Service}; -use futures::{future, Async, Future, Poll, Stream}; +use futures::{Async, Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use config::{KeepAlive, ServiceConfig}; -use error::{DispatchError, ParseError}; +use error::DispatchError; use request::Request; use response::Response; -use super::codec::{Codec, InMessage}; use super::dispatcher::Dispatcher; /// `NewService` implementation for HTTP1 transport @@ -257,78 +255,3 @@ where Dispatcher::new(req, self.cfg.clone(), self.srv.clone()) } } - -/// `NewService` that implements, read one request from framed object feature. -pub struct TakeRequest { - _t: PhantomData, -} - -impl TakeRequest { - /// Create new `TakeRequest` instance. - pub fn new() -> Self { - TakeRequest { _t: PhantomData } - } -} - -impl NewService for TakeRequest -where - T: AsyncRead + AsyncWrite, -{ - type Request = Framed; - type Response = (Option, Framed); - type Error = ParseError; - type InitError = (); - type Service = TakeRequestService; - type Future = future::FutureResult; - - fn new_service(&self) -> Self::Future { - future::ok(TakeRequestService { _t: PhantomData }) - } -} - -/// `NewService` that implements, read one request from framed object feature. -pub struct TakeRequestService { - _t: PhantomData, -} - -impl Service for TakeRequestService -where - T: AsyncRead + AsyncWrite, -{ - type Request = Framed; - type Response = (Option, Framed); - type Error = ParseError; - type Future = TakeRequestServiceResponse; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, framed: Self::Request) -> Self::Future { - TakeRequestServiceResponse { - framed: Some(framed), - } - } -} - -pub struct TakeRequestServiceResponse -where - T: AsyncRead + AsyncWrite, -{ - framed: Option>, -} - -impl Future for TakeRequestServiceResponse -where - T: AsyncRead + AsyncWrite, -{ - type Item = (Option, Framed); - type Error = ParseError; - - fn poll(&mut self) -> Poll { - match self.framed.as_mut().unwrap().poll()? { - Async::Ready(item) => Ok(Async::Ready((item, self.framed.take().unwrap()))), - Async::NotReady => Ok(Async::NotReady), - } - } -} diff --git a/tests/test_ws.rs b/tests/test_ws.rs index a2a18ff28..8a1098747 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -12,6 +12,7 @@ use actix_net::codec::Framed; use actix_net::framed::IntoFramed; use actix_net::server::Server; use actix_net::service::NewServiceExt; +use actix_net::stream::TakeItem; use actix_web::{test, ws as web_ws}; use bytes::Bytes; use futures::future::{ok, Either}; @@ -36,7 +37,7 @@ fn test_simple() { Server::new() .bind("test", addr, move || { IntoFramed::new(|| h1::Codec::new(false)) - .and_then(h1::TakeRequest::new().map_err(|_| ())) + .and_then(TakeItem::new().map_err(|_| ())) .and_then(|(req, framed): (_, Framed<_, _>)| { // validate request if let Some(h1::InMessage::MessageWithPayload(req)) = req {