diff --git a/actix-http/examples/framed_hello.rs b/actix-http/examples/framed_hello.rs deleted file mode 100644 index 7d4c13d34..000000000 --- a/actix-http/examples/framed_hello.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::{env, io}; - -use actix_codec::Framed; -use actix_http::{h1, Response, SendResponse, ServiceConfig}; -use actix_server::{Io, Server}; -use actix_service::{fn_service, NewService}; -use actix_utils::framed::IntoFramed; -use actix_utils::stream::TakeItem; -use futures::Future; -use tokio_tcp::TcpStream; - -fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "framed_hello=info"); - env_logger::init(); - - Server::build() - .bind("framed_hello", "127.0.0.1:8080", || { - fn_service(|io: Io| Ok(io.into_parts().0)) - .and_then(IntoFramed::new(|| h1::Codec::new(ServiceConfig::default()))) - .and_then(TakeItem::new().map_err(|_| ())) - .and_then(|(_req, _framed): (_, Framed<_, _>)| { - SendResponse::send(_framed, Response::Ok().body("Hello world!")) - .map_err(|_| ()) - .map(|_| ()) - }) - })? - .run() -} diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index 088125ae0..ed3669e85 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -37,7 +37,7 @@ pub use self::message::{Message, RequestHead, ResponseHead}; pub use self::payload::{Payload, PayloadStream}; pub use self::request::Request; pub use self::response::{Response, ResponseBuilder}; -pub use self::service::{HttpService, SendError, SendResponse}; +pub use self::service::HttpService; pub mod http { //! Various HTTP related types diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index ff0ce48de..c3fed133d 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -208,7 +208,7 @@ impl Response { } /// Set a body - pub(crate) fn set_body(self, body: B2) -> Response { + pub fn set_body(self, body: B2) -> Response { Response { head: self.head, body: ResponseBody::Body(body), @@ -217,7 +217,7 @@ impl Response { } /// Drop request's body - pub(crate) fn drop_body(self) -> Response<()> { + pub fn drop_body(self) -> Response<()> { Response { head: self.head, body: ResponseBody::Body(()), diff --git a/actix-http/src/service/service.rs b/actix-http/src/service.rs similarity index 100% rename from actix-http/src/service/service.rs rename to actix-http/src/service.rs diff --git a/actix-http/src/service/mod.rs b/actix-http/src/service/mod.rs deleted file mode 100644 index 25e95bf60..000000000 --- a/actix-http/src/service/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod senderror; -mod service; - -pub use self::senderror::{SendError, SendResponse}; -pub use self::service::HttpService; diff --git a/actix-http/src/service/senderror.rs b/actix-http/src/service/senderror.rs deleted file mode 100644 index 03fe5976a..000000000 --- a/actix-http/src/service/senderror.rs +++ /dev/null @@ -1,241 +0,0 @@ -use std::marker::PhantomData; - -use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_service::{NewService, Service}; -use futures::future::{ok, Either, FutureResult}; -use futures::{Async, Future, Poll, Sink}; - -use crate::body::{BodySize, MessageBody, ResponseBody}; -use crate::error::{Error, ResponseError}; -use crate::h1::{Codec, Message}; -use crate::response::Response; - -pub struct SendError(PhantomData<(T, R, E)>); - -impl Default for SendError -where - T: AsyncRead + AsyncWrite, - E: ResponseError, -{ - fn default() -> Self { - SendError(PhantomData) - } -} - -impl NewService for SendError -where - T: AsyncRead + AsyncWrite, - E: ResponseError, -{ - type Request = Result)>; - type Response = R; - type Error = (E, Framed); - type InitError = (); - type Service = SendError; - type Future = FutureResult; - - fn new_service(&self, _: &()) -> Self::Future { - ok(SendError(PhantomData)) - } -} - -impl Service for SendError -where - T: AsyncRead + AsyncWrite, - E: ResponseError, -{ - type Request = Result)>; - type Response = R; - type Error = (E, Framed); - type Future = Either)>, SendErrorFut>; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, req: Result)>) -> Self::Future { - match req { - Ok(r) => Either::A(ok(r)), - Err((e, framed)) => { - let res = e.error_response().set_body(format!("{}", e)); - let (res, _body) = res.replace_body(()); - Either::B(SendErrorFut { - framed: Some(framed), - res: Some((res, BodySize::Empty).into()), - err: Some(e), - _t: PhantomData, - }) - } - } - } -} - -pub struct SendErrorFut { - res: Option, BodySize)>>, - framed: Option>, - err: Option, - _t: PhantomData, -} - -impl Future for SendErrorFut -where - E: ResponseError, - T: AsyncRead + AsyncWrite, -{ - type Item = R; - type Error = (E, Framed); - - fn poll(&mut self) -> Poll { - if let Some(res) = self.res.take() { - if self.framed.as_mut().unwrap().force_send(res).is_err() { - return Err((self.err.take().unwrap(), self.framed.take().unwrap())); - } - } - match self.framed.as_mut().unwrap().poll_complete() { - Ok(Async::Ready(_)) => { - Err((self.err.take().unwrap(), self.framed.take().unwrap())) - } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => Err((self.err.take().unwrap(), self.framed.take().unwrap())), - } - } -} - -pub struct SendResponse(PhantomData<(T, B)>); - -impl Default for SendResponse { - fn default() -> Self { - SendResponse(PhantomData) - } -} - -impl SendResponse -where - T: AsyncRead + AsyncWrite, - B: MessageBody, -{ - pub fn send( - framed: Framed, - res: Response, - ) -> impl Future, Error = Error> { - // extract body from response - let (res, body) = res.replace_body(()); - - // write response - SendResponseFut { - res: Some(Message::Item((res, body.length()))), - body: Some(body), - framed: Some(framed), - } - } -} - -impl NewService for SendResponse -where - T: AsyncRead + AsyncWrite, - B: MessageBody, -{ - type Request = (Response, Framed); - type Response = Framed; - type Error = Error; - type InitError = (); - type Service = SendResponse; - type Future = FutureResult; - - fn new_service(&self, _: &()) -> Self::Future { - ok(SendResponse(PhantomData)) - } -} - -impl Service for SendResponse -where - T: AsyncRead + AsyncWrite, - B: MessageBody, -{ - type Request = (Response, Framed); - type Response = Framed; - type Error = Error; - type Future = SendResponseFut; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, (res, framed): (Response, Framed)) -> Self::Future { - let (res, body) = res.replace_body(()); - SendResponseFut { - res: Some(Message::Item((res, body.length()))), - body: Some(body), - framed: Some(framed), - } - } -} - -pub struct SendResponseFut { - res: Option, BodySize)>>, - body: Option>, - framed: Option>, -} - -impl Future for SendResponseFut -where - T: AsyncRead + AsyncWrite, - B: MessageBody, -{ - type Item = Framed; - type Error = Error; - - fn poll(&mut self) -> Poll { - loop { - let mut body_ready = self.body.is_some(); - let framed = self.framed.as_mut().unwrap(); - - // send body - if self.res.is_none() && self.body.is_some() { - while body_ready && self.body.is_some() && !framed.is_write_buf_full() { - match self.body.as_mut().unwrap().poll_next()? { - Async::Ready(item) => { - // body is done - if item.is_none() { - let _ = self.body.take(); - } - framed.force_send(Message::Chunk(item))?; - } - Async::NotReady => body_ready = false, - } - } - } - - // flush write buffer - if !framed.is_write_buf_empty() { - match framed.poll_complete()? { - Async::Ready(_) => { - if body_ready { - continue; - } else { - return Ok(Async::NotReady); - } - } - Async::NotReady => return Ok(Async::NotReady), - } - } - - // send response - if let Some(res) = self.res.take() { - framed.force_send(res)?; - continue; - } - - if self.body.is_some() { - if body_ready { - continue; - } else { - return Ok(Async::NotReady); - } - } else { - break; - } - } - Ok(Async::Ready(self.framed.take().unwrap())) - } -} diff --git a/awc/tests/test_ws.rs b/awc/tests/test_ws.rs index d8942fb17..04a6a110a 100644 --- a/awc/tests/test_ws.rs +++ b/awc/tests/test_ws.rs @@ -11,7 +11,7 @@ use futures::future::{ok, Either}; use futures::{Future, Sink, Stream}; use tokio_tcp::TcpStream; -use actix_http::{h1, ws, ResponseError, SendResponse, ServiceConfig}; +use actix_http::{body::BodySize, h1, ws, ResponseError, ServiceConfig}; fn ws_service(req: ws::Frame) -> impl Future { match req { @@ -46,26 +46,34 @@ fn test_simple() { match ws::verify_handshake(&req) { Err(e) => { // validation failed + let res = e.error_response(); Either::A( - SendResponse::send(framed, e.error_response()) + framed + .send(h1::Message::Item(( + res.drop_body(), + BodySize::Empty, + ))) .map_err(|_| ()) .map(|_| ()), ) } Ok(_) => { + let res = ws::handshake_response(&req).finish(); Either::B( // send handshake response - SendResponse::send( - framed, - ws::handshake_response(&req).finish(), - ) - .map_err(|_| ()) - .and_then(|framed| { - // start websocket service - let framed = framed.into_framed(ws::Codec::new()); - ws::Transport::with(framed, ws_service) - .map_err(|_| ()) - }), + framed + .send(h1::Message::Item(( + res.drop_body(), + BodySize::None, + ))) + .map_err(|_| ()) + .and_then(|framed| { + // start websocket service + let framed = + framed.into_framed(ws::Codec::new()); + ws::Transport::with(framed, ws_service) + .map_err(|_| ()) + }), ) } } diff --git a/src/lib.rs b/src/lib.rs index a668b83be..39c054bc4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -189,6 +189,7 @@ pub mod client { pub use awc::error::{ ConnectError, InvalidUrl, PayloadError, SendRequestError, WsClientError, }; - pub use awc::{test, Client, ClientBuilder, ClientRequest, ClientResponse, - Connector}; + pub use awc::{ + test, Client, ClientBuilder, ClientRequest, ClientResponse, Connector, + }; }