diff --git a/src/h1/mod.rs b/src/h1/mod.rs index f9abfea5c..266ebf39c 100644 --- a/src/h1/mod.rs +++ b/src/h1/mod.rs @@ -7,4 +7,4 @@ mod service; pub use self::codec::{Codec, InMessage, OutMessage}; pub use self::dispatcher::Dispatcher; -pub use self::service::{H1Service, H1ServiceHandler}; +pub use self::service::{H1Service, H1ServiceHandler, TakeRequest}; diff --git a/src/h1/service.rs b/src/h1/service.rs index 8038c9fb8..02535fc8c 100644 --- a/src/h1/service.rs +++ b/src/h1/service.rs @@ -1,15 +1,17 @@ use std::fmt::{Debug, Display}; use std::marker::PhantomData; +use actix_net::codec::Framed; use actix_net::service::{IntoNewService, NewService, Service}; -use futures::{Async, Future, Poll}; +use futures::{future, Async, Future, Poll, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use config::ServiceConfig; -use error::DispatchError; +use error::{DispatchError, ParseError}; use request::Request; use response::Response; +use super::codec::{Codec, InMessage}; use super::dispatcher::Dispatcher; /// `NewService` implementation for HTTP1 transport @@ -121,3 +123,78 @@ where Dispatcher::new(req, self.cfg.clone(), self.srv.clone()) } } + +/// `NewService` that implements, read one request from framed object feature. +pub struct TakeRequest { + _t: PhantomData, +} + +impl TakeRequest { + /// Create new `TakeRequest` instance. + pub fn new() -> Self { + TakeRequest { _t: PhantomData } + } +} + +impl NewService for TakeRequest +where + T: AsyncRead + AsyncWrite, +{ + type Request = Framed; + type Response = (Option, Framed); + type Error = ParseError; + type InitError = (); + type Service = TakeRequestService; + type Future = future::FutureResult; + + fn new_service(&self) -> Self::Future { + future::ok(TakeRequestService { _t: PhantomData }) + } +} + +/// `NewService` that implements, read one request from framed object feature. +pub struct TakeRequestService { + _t: PhantomData, +} + +impl Service for TakeRequestService +where + T: AsyncRead + AsyncWrite, +{ + type Request = Framed; + type Response = (Option, Framed); + type Error = ParseError; + type Future = TakeRequestServiceResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, framed: Self::Request) -> Self::Future { + TakeRequestServiceResponse { + framed: Some(framed), + } + } +} + +pub struct TakeRequestServiceResponse +where + T: AsyncRead + AsyncWrite, +{ + framed: Option>, +} + +impl Future for TakeRequestServiceResponse +where + T: AsyncRead + AsyncWrite, +{ + type Item = (Option, Framed); + type Error = ParseError; + + fn poll(&mut self) -> Poll { + match self.framed.as_mut().unwrap().poll()? { + Async::Ready(item) => Ok(Async::Ready((item, self.framed.take().unwrap()))), + Async::NotReady => Ok(Async::NotReady), + } + } +} diff --git a/src/ws/mod.rs b/src/ws/mod.rs index bd657d944..5ebb502bb 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -3,7 +3,6 @@ //! To setup a `WebSocket`, first do web socket handshake then on success //! convert `Payload` into a `WsStream` stream and then use `WsWriter` to //! communicate with the peer. -//! ``` use std::io; use error::ResponseError; diff --git a/tests/test_ws.rs b/tests/test_ws.rs index 86a309191..a2a18ff28 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -9,8 +9,9 @@ use std::{io, thread}; use actix::System; use actix_net::codec::Framed; +use actix_net::framed::IntoFramed; use actix_net::server::Server; -use actix_net::service::IntoNewService; +use actix_net::service::NewServiceExt; use actix_web::{test, ws as web_ws}; use bytes::Bytes; use futures::future::{ok, Either}; @@ -18,7 +19,7 @@ use futures::{Future, Sink, Stream}; use actix_http::{h1, ws, ResponseError}; -fn ws_handler(req: ws::Message) -> impl Future { +fn ws_service(req: ws::Message) -> impl Future { match req { ws::Message::Ping(msg) => ok(ws::Message::Pong(msg)), ws::Message::Text(text) => ok(ws::Message::Text(text)), @@ -34,46 +35,40 @@ fn test_simple() { thread::spawn(move || { Server::new() .bind("test", addr, move || { - (|io| { - // read http request - let framed = Framed::new(io, h1::Codec::new(false)); - framed - .into_future() - .map_err(|_| ()) - .and_then(|(req, framed)| { - // validate request - if let Some(h1::InMessage::MessageWithPayload(req)) = req { - match ws::handshake(&req) { - Err(e) => { - // validation failed - let resp = e.error_response(); - Either::A( - framed - .send(h1::OutMessage::Response(resp)) - .map_err(|_| ()) - .map(|_| ()), - ) - } - Ok(mut resp) => Either::B( - // send response + IntoFramed::new(|| h1::Codec::new(false)) + .and_then(h1::TakeRequest::new().map_err(|_| ())) + .and_then(|(req, framed): (_, Framed<_, _>)| { + // validate request + if let Some(h1::InMessage::MessageWithPayload(req)) = req { + match ws::handshake(&req) { + Err(e) => { + // validation failed + let resp = e.error_response(); + Either::A( framed - .send(h1::OutMessage::Response( - resp.finish(), - )).map_err(|_| ()) - .and_then(|framed| { - // start websocket service - let framed = - framed.into_framed(ws::Codec::new()); - ws::Transport::with(framed, ws_handler) - .map_err(|_| ()) - }), - ), + .send(h1::OutMessage::Response(resp)) + .map_err(|_| ()) + .map(|_| ()), + ) } - } else { - panic!() + Ok(mut resp) => Either::B( + // send response + framed + .send(h1::OutMessage::Response(resp.finish())) + .map_err(|_| ()) + .and_then(|framed| { + // start websocket service + let framed = + framed.into_framed(ws::Codec::new()); + ws::Transport::with(framed, ws_service) + .map_err(|_| ()) + }), + ), } - }) - }).into_new_service() + } else { + panic!() + } + }) }).unwrap() .run(); });