1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-27 17:52:56 +01:00

migrate actix-framed

This commit is contained in:
Nikolay Kim 2019-11-21 12:17:01 +06:00
parent 69cadcdedb
commit 95e2a0ef2e
8 changed files with 263 additions and 217 deletions

View File

@ -33,7 +33,7 @@ members = [
"actix-http", "actix-http",
"actix-cors", "actix-cors",
"actix-files", "actix-files",
#"actix-framed", "actix-framed",
#"actix-session", #"actix-session",
"actix-identity", "actix-identity",
#"actix-multipart", #"actix-multipart",

View File

@ -20,19 +20,20 @@ name = "actix_framed"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-codec = "0.1.2" actix-codec = "0.2.0-alpha.1"
actix-service = "0.4.2" actix-service = "1.0.0-alpha.1"
actix-router = "0.1.2" actix-router = "0.1.2"
actix-rt = "0.2.2" actix-rt = "1.0.0-alpha.1"
actix-http = "0.2.11" actix-http = "0.3.0-alpha.1"
actix-server-config = "0.1.1" actix-server-config = "0.3.0-alpha.1"
bytes = "0.4" bytes = "0.4"
futures = "0.1.25" futures = "0.3.1"
pin-project = "0.4.6"
log = "0.4" log = "0.4"
[dev-dependencies] [dev-dependencies]
actix-server = { version = "0.6.0", features=["openssl"] } actix-server = { version = "0.8.0-alpha.1", features=["openssl"] }
actix-connect = { version = "0.2.0", features=["openssl"] } actix-connect = { version = "0.3.0-alpha.1", features=["openssl"] }
actix-http-test = { version = "0.1.0", features=["openssl"] } actix-http-test = { version = "0.3.0-alpha.1", features=["openssl"] }
actix-utils = "0.4.0" actix-utils = "0.5.0-alpha.1"

View File

@ -1,21 +1,24 @@
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::h1::{Codec, SendResponse}; use actix_http::h1::{Codec, SendResponse};
use actix_http::{Error, Request, Response}; use actix_http::{Error, Request, Response};
use actix_router::{Path, Router, Url}; use actix_router::{Path, Router, Url};
use actix_server_config::ServerConfig; use actix_server_config::ServerConfig;
use actix_service::{IntoNewService, NewService, Service}; use actix_service::{IntoServiceFactory, Service, ServiceFactory};
use futures::{Async, Future, Poll}; use futures::future::{ok, FutureExt, LocalBoxFuture};
use crate::helpers::{BoxedHttpNewService, BoxedHttpService, HttpNewService}; use crate::helpers::{BoxedHttpNewService, BoxedHttpService, HttpNewService};
use crate::request::FramedRequest; use crate::request::FramedRequest;
use crate::state::State; use crate::state::State;
type BoxedResponse = Box<dyn Future<Item = (), Error = Error>>; type BoxedResponse = LocalBoxFuture<'static, Result<(), Error>>;
pub trait HttpServiceFactory { pub trait HttpServiceFactory {
type Factory: NewService; type Factory: ServiceFactory;
fn path(&self) -> &str; fn path(&self) -> &str;
@ -48,19 +51,19 @@ impl<T: 'static, S: 'static> FramedApp<T, S> {
pub fn service<U>(mut self, factory: U) -> Self pub fn service<U>(mut self, factory: U) -> Self
where where
U: HttpServiceFactory, U: HttpServiceFactory,
U::Factory: NewService< U::Factory: ServiceFactory<
Config = (), Config = (),
Request = FramedRequest<T, S>, Request = FramedRequest<T, S>,
Response = (), Response = (),
Error = Error, Error = Error,
InitError = (), InitError = (),
> + 'static, > + 'static,
<U::Factory as NewService>::Future: 'static, <U::Factory as ServiceFactory>::Future: 'static,
<U::Factory as NewService>::Service: Service< <U::Factory as ServiceFactory>::Service: Service<
Request = FramedRequest<T, S>, Request = FramedRequest<T, S>,
Response = (), Response = (),
Error = Error, Error = Error,
Future = Box<dyn Future<Item = (), Error = Error>>, Future = LocalBoxFuture<'static, Result<(), Error>>,
>, >,
{ {
let path = factory.path().to_string(); let path = factory.path().to_string();
@ -70,12 +73,12 @@ impl<T: 'static, S: 'static> FramedApp<T, S> {
} }
} }
impl<T, S> IntoNewService<FramedAppFactory<T, S>> for FramedApp<T, S> impl<T, S> IntoServiceFactory<FramedAppFactory<T, S>> for FramedApp<T, S>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
S: 'static, S: 'static,
{ {
fn into_new_service(self) -> FramedAppFactory<T, S> { fn into_factory(self) -> FramedAppFactory<T, S> {
FramedAppFactory { FramedAppFactory {
state: self.state, state: self.state,
services: Rc::new(self.services), services: Rc::new(self.services),
@ -89,9 +92,9 @@ pub struct FramedAppFactory<T, S> {
services: Rc<Vec<(String, BoxedHttpNewService<FramedRequest<T, S>>)>>, services: Rc<Vec<(String, BoxedHttpNewService<FramedRequest<T, S>>)>>,
} }
impl<T, S> NewService for FramedAppFactory<T, S> impl<T, S> ServiceFactory for FramedAppFactory<T, S>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
S: 'static, S: 'static,
{ {
type Config = ServerConfig; type Config = ServerConfig;
@ -128,28 +131,30 @@ pub struct CreateService<T, S> {
enum CreateServiceItem<T, S> { enum CreateServiceItem<T, S> {
Future( Future(
Option<String>, Option<String>,
Box<dyn Future<Item = BoxedHttpService<FramedRequest<T, S>>, Error = ()>>, LocalBoxFuture<'static, Result<BoxedHttpService<FramedRequest<T, S>>, ()>>,
), ),
Service(String, BoxedHttpService<FramedRequest<T, S>>), Service(String, BoxedHttpService<FramedRequest<T, S>>),
} }
impl<S: 'static, T: 'static> Future for CreateService<T, S> impl<S: 'static, T: 'static> Future for CreateService<T, S>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite + Unpin,
{ {
type Item = FramedAppService<T, S>; type Output = Result<FramedAppService<T, S>, ()>;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut done = true; let mut done = true;
// poll http services // poll http services
for item in &mut self.fut { for item in &mut self.fut {
let res = match item { let res = match item {
CreateServiceItem::Future(ref mut path, ref mut fut) => { CreateServiceItem::Future(ref mut path, ref mut fut) => {
match fut.poll()? { match Pin::new(fut).poll(cx) {
Async::Ready(service) => Some((path.take().unwrap(), service)), Poll::Ready(Ok(service)) => {
Async::NotReady => { Some((path.take().unwrap(), service))
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {
done = false; done = false;
None None
} }
@ -176,12 +181,12 @@ where
} }
router router
}); });
Ok(Async::Ready(FramedAppService { Poll::Ready(Ok(FramedAppService {
router: router.finish(), router: router.finish(),
state: self.state.clone(), state: self.state.clone(),
})) }))
} else { } else {
Ok(Async::NotReady) Poll::Pending
} }
} }
} }
@ -193,15 +198,15 @@ pub struct FramedAppService<T, S> {
impl<S: 'static, T: 'static> Service for FramedAppService<T, S> impl<S: 'static, T: 'static> Service for FramedAppService<T, S>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite + Unpin,
{ {
type Request = (Request, Framed<T, Codec>); type Request = (Request, Framed<T, Codec>);
type Response = (); type Response = ();
type Error = Error; type Error = Error;
type Future = BoxedResponse; type Future = BoxedResponse;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, (req, framed): (Request, Framed<T, Codec>)) -> Self::Future { fn call(&mut self, (req, framed): (Request, Framed<T, Codec>)) -> Self::Future {
@ -210,8 +215,8 @@ where
if let Some((srv, _info)) = self.router.recognize_mut(&mut path) { if let Some((srv, _info)) = self.router.recognize_mut(&mut path) {
return srv.call(FramedRequest::new(req, framed, path, self.state.clone())); return srv.call(FramedRequest::new(req, framed, path, self.state.clone()));
} }
Box::new( SendResponse::new(framed, Response::NotFound().finish())
SendResponse::new(framed, Response::NotFound().finish()).then(|_| Ok(())), .then(|_| ok(()))
) .boxed_local()
} }
} }

View File

@ -1,36 +1,38 @@
use std::task::{Context, Poll};
use actix_http::Error; use actix_http::Error;
use actix_service::{NewService, Service}; use actix_service::{Service, ServiceFactory};
use futures::{Future, Poll}; use futures::future::{FutureExt, LocalBoxFuture};
pub(crate) type BoxedHttpService<Req> = Box< pub(crate) type BoxedHttpService<Req> = Box<
dyn Service< dyn Service<
Request = Req, Request = Req,
Response = (), Response = (),
Error = Error, Error = Error,
Future = Box<dyn Future<Item = (), Error = Error>>, Future = LocalBoxFuture<'static, Result<(), Error>>,
>, >,
>; >;
pub(crate) type BoxedHttpNewService<Req> = Box< pub(crate) type BoxedHttpNewService<Req> = Box<
dyn NewService< dyn ServiceFactory<
Config = (), Config = (),
Request = Req, Request = Req,
Response = (), Response = (),
Error = Error, Error = Error,
InitError = (), InitError = (),
Service = BoxedHttpService<Req>, Service = BoxedHttpService<Req>,
Future = Box<dyn Future<Item = BoxedHttpService<Req>, Error = ()>>, Future = LocalBoxFuture<'static, Result<BoxedHttpService<Req>, ()>>,
>, >,
>; >;
pub(crate) struct HttpNewService<T: NewService>(T); pub(crate) struct HttpNewService<T: ServiceFactory>(T);
impl<T> HttpNewService<T> impl<T> HttpNewService<T>
where where
T: NewService<Response = (), Error = Error>, T: ServiceFactory<Response = (), Error = Error>,
T::Response: 'static, T::Response: 'static,
T::Future: 'static, T::Future: 'static,
T::Service: Service<Future = Box<dyn Future<Item = (), Error = Error>>> + 'static, T::Service: Service<Future = LocalBoxFuture<'static, Result<(), Error>>> + 'static,
<T::Service as Service>::Future: 'static, <T::Service as Service>::Future: 'static,
{ {
pub fn new(service: T) -> Self { pub fn new(service: T) -> Self {
@ -38,12 +40,12 @@ where
} }
} }
impl<T> NewService for HttpNewService<T> impl<T> ServiceFactory for HttpNewService<T>
where where
T: NewService<Config = (), Response = (), Error = Error>, T: ServiceFactory<Config = (), Response = (), Error = Error>,
T::Request: 'static, T::Request: 'static,
T::Future: 'static, T::Future: 'static,
T::Service: Service<Future = Box<dyn Future<Item = (), Error = Error>>> + 'static, T::Service: Service<Future = LocalBoxFuture<'static, Result<(), Error>>> + 'static,
<T::Service as Service>::Future: 'static, <T::Service as Service>::Future: 'static,
{ {
type Config = (); type Config = ();
@ -52,13 +54,19 @@ where
type Error = Error; type Error = Error;
type InitError = (); type InitError = ();
type Service = BoxedHttpService<T::Request>; type Service = BoxedHttpService<T::Request>;
type Future = Box<dyn Future<Item = Self::Service, Error = ()>>; type Future = LocalBoxFuture<'static, Result<Self::Service, ()>>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
Box::new(self.0.new_service(&()).map_err(|_| ()).and_then(|service| { let fut = self.0.new_service(&());
let service: BoxedHttpService<_> = Box::new(HttpServiceWrapper { service });
Ok(service) async move {
})) fut.await.map_err(|_| ()).map(|service| {
let service: BoxedHttpService<_> =
Box::new(HttpServiceWrapper { service });
service
})
}
.boxed_local()
} }
} }
@ -70,7 +78,7 @@ impl<T> Service for HttpServiceWrapper<T>
where where
T: Service< T: Service<
Response = (), Response = (),
Future = Box<dyn Future<Item = (), Error = Error>>, Future = LocalBoxFuture<'static, Result<(), Error>>,
Error = Error, Error = Error,
>, >,
T::Request: 'static, T::Request: 'static,
@ -78,10 +86,10 @@ where
type Request = T::Request; type Request = T::Request;
type Response = (); type Response = ();
type Error = Error; type Error = Error;
type Future = Box<dyn Future<Item = (), Error = Error>>; type Future = LocalBoxFuture<'static, Result<(), Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready() self.service.poll_ready(cx)
} }
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {

View File

@ -1,11 +1,12 @@
use std::fmt; use std::fmt;
use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_http::{http::Method, Error}; use actix_http::{http::Method, Error};
use actix_service::{NewService, Service}; use actix_service::{Service, ServiceFactory};
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureExt, LocalBoxFuture, Ready};
use futures::{Async, Future, IntoFuture, Poll};
use log::error; use log::error;
use crate::app::HttpServiceFactory; use crate::app::HttpServiceFactory;
@ -15,11 +16,11 @@ use crate::request::FramedRequest;
/// ///
/// Route uses builder-like pattern for configuration. /// Route uses builder-like pattern for configuration.
/// If handler is not explicitly set, default *404 Not Found* handler is used. /// If handler is not explicitly set, default *404 Not Found* handler is used.
pub struct FramedRoute<Io, S, F = (), R = ()> { pub struct FramedRoute<Io, S, F = (), R = (), E = ()> {
handler: F, handler: F,
pattern: String, pattern: String,
methods: Vec<Method>, methods: Vec<Method>,
state: PhantomData<(Io, S, R)>, state: PhantomData<(Io, S, R, E)>,
} }
impl<Io, S> FramedRoute<Io, S> { impl<Io, S> FramedRoute<Io, S> {
@ -53,12 +54,12 @@ impl<Io, S> FramedRoute<Io, S> {
self self
} }
pub fn to<F, R>(self, handler: F) -> FramedRoute<Io, S, F, R> pub fn to<F, R, E>(self, handler: F) -> FramedRoute<Io, S, F, R, E>
where where
F: FnMut(FramedRequest<Io, S>) -> R, F: FnMut(FramedRequest<Io, S>) -> R,
R: IntoFuture<Item = ()>, R: Future<Output = Result<(), E>> + 'static,
R::Future: 'static,
R::Error: fmt::Debug, E: fmt::Debug,
{ {
FramedRoute { FramedRoute {
handler, handler,
@ -69,15 +70,14 @@ impl<Io, S> FramedRoute<Io, S> {
} }
} }
impl<Io, S, F, R> HttpServiceFactory for FramedRoute<Io, S, F, R> impl<Io, S, F, R, E> HttpServiceFactory for FramedRoute<Io, S, F, R, E>
where where
Io: AsyncRead + AsyncWrite + 'static, Io: AsyncRead + AsyncWrite + 'static,
F: FnMut(FramedRequest<Io, S>) -> R + Clone, F: FnMut(FramedRequest<Io, S>) -> R + Clone,
R: IntoFuture<Item = ()>, R: Future<Output = Result<(), E>> + 'static,
R::Future: 'static, E: fmt::Display,
R::Error: fmt::Display,
{ {
type Factory = FramedRouteFactory<Io, S, F, R>; type Factory = FramedRouteFactory<Io, S, F, R, E>;
fn path(&self) -> &str { fn path(&self) -> &str {
&self.pattern &self.pattern
@ -92,27 +92,26 @@ where
} }
} }
pub struct FramedRouteFactory<Io, S, F, R> { pub struct FramedRouteFactory<Io, S, F, R, E> {
handler: F, handler: F,
methods: Vec<Method>, methods: Vec<Method>,
_t: PhantomData<(Io, S, R)>, _t: PhantomData<(Io, S, R, E)>,
} }
impl<Io, S, F, R> NewService for FramedRouteFactory<Io, S, F, R> impl<Io, S, F, R, E> ServiceFactory for FramedRouteFactory<Io, S, F, R, E>
where where
Io: AsyncRead + AsyncWrite + 'static, Io: AsyncRead + AsyncWrite + 'static,
F: FnMut(FramedRequest<Io, S>) -> R + Clone, F: FnMut(FramedRequest<Io, S>) -> R + Clone,
R: IntoFuture<Item = ()>, R: Future<Output = Result<(), E>> + 'static,
R::Future: 'static, E: fmt::Display,
R::Error: fmt::Display,
{ {
type Config = (); type Config = ();
type Request = FramedRequest<Io, S>; type Request = FramedRequest<Io, S>;
type Response = (); type Response = ();
type Error = Error; type Error = Error;
type InitError = (); type InitError = ();
type Service = FramedRouteService<Io, S, F, R>; type Service = FramedRouteService<Io, S, F, R, E>;
type Future = FutureResult<Self::Service, Self::InitError>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(FramedRouteService { ok(FramedRouteService {
@ -123,35 +122,38 @@ where
} }
} }
pub struct FramedRouteService<Io, S, F, R> { pub struct FramedRouteService<Io, S, F, R, E> {
handler: F, handler: F,
methods: Vec<Method>, methods: Vec<Method>,
_t: PhantomData<(Io, S, R)>, _t: PhantomData<(Io, S, R, E)>,
} }
impl<Io, S, F, R> Service for FramedRouteService<Io, S, F, R> impl<Io, S, F, R, E> Service for FramedRouteService<Io, S, F, R, E>
where where
Io: AsyncRead + AsyncWrite + 'static, Io: AsyncRead + AsyncWrite + 'static,
F: FnMut(FramedRequest<Io, S>) -> R + Clone, F: FnMut(FramedRequest<Io, S>) -> R + Clone,
R: IntoFuture<Item = ()>, R: Future<Output = Result<(), E>> + 'static,
R::Future: 'static, E: fmt::Display,
R::Error: fmt::Display,
{ {
type Request = FramedRequest<Io, S>; type Request = FramedRequest<Io, S>;
type Response = (); type Response = ();
type Error = Error; type Error = Error;
type Future = Box<dyn Future<Item = (), Error = Error>>; type Future = LocalBoxFuture<'static, Result<(), Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, req: FramedRequest<Io, S>) -> Self::Future { fn call(&mut self, req: FramedRequest<Io, S>) -> Self::Future {
Box::new((self.handler)(req).into_future().then(|res| { let fut = (self.handler)(req);
async move {
let res = fut.await;
if let Err(e) = res { if let Err(e) = res {
error!("Error in request handler: {}", e); error!("Error in request handler: {}", e);
} }
Ok(()) Ok(())
})) }
.boxed_local()
} }
} }

View File

@ -1,4 +1,6 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::body::BodySize; use actix_http::body::BodySize;
@ -6,9 +8,9 @@ use actix_http::error::ResponseError;
use actix_http::h1::{Codec, Message}; use actix_http::h1::{Codec, Message};
use actix_http::ws::{verify_handshake, HandshakeError}; use actix_http::ws::{verify_handshake, HandshakeError};
use actix_http::{Request, Response}; use actix_http::{Request, Response};
use actix_service::{NewService, Service}; use actix_service::{Service, ServiceFactory};
use futures::future::{ok, Either, FutureResult}; use futures::future::{err, ok, Either, Ready};
use futures::{Async, Future, IntoFuture, Poll, Sink}; use futures::Future;
/// Service that verifies incoming request if it is valid websocket /// Service that verifies incoming request if it is valid websocket
/// upgrade request. In case of error returns `HandshakeError` /// upgrade request. In case of error returns `HandshakeError`
@ -22,14 +24,14 @@ impl<T, C> Default for VerifyWebSockets<T, C> {
} }
} }
impl<T, C> NewService for VerifyWebSockets<T, C> { impl<T, C> ServiceFactory for VerifyWebSockets<T, C> {
type Config = C; type Config = C;
type Request = (Request, Framed<T, Codec>); type Request = (Request, Framed<T, Codec>);
type Response = (Request, Framed<T, Codec>); type Response = (Request, Framed<T, Codec>);
type Error = (HandshakeError, Framed<T, Codec>); type Error = (HandshakeError, Framed<T, Codec>);
type InitError = (); type InitError = ();
type Service = VerifyWebSockets<T, C>; type Service = VerifyWebSockets<T, C>;
type Future = FutureResult<Self::Service, Self::InitError>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &C) -> Self::Future { fn new_service(&self, _: &C) -> Self::Future {
ok(VerifyWebSockets { _t: PhantomData }) ok(VerifyWebSockets { _t: PhantomData })
@ -40,16 +42,16 @@ impl<T, C> Service for VerifyWebSockets<T, C> {
type Request = (Request, Framed<T, Codec>); type Request = (Request, Framed<T, Codec>);
type Response = (Request, Framed<T, Codec>); type Response = (Request, Framed<T, Codec>);
type Error = (HandshakeError, Framed<T, Codec>); type Error = (HandshakeError, Framed<T, Codec>);
type Future = FutureResult<Self::Response, Self::Error>; type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, (req, framed): (Request, Framed<T, Codec>)) -> Self::Future { fn call(&mut self, (req, framed): (Request, Framed<T, Codec>)) -> Self::Future {
match verify_handshake(req.head()) { match verify_handshake(req.head()) {
Err(e) => Err((e, framed)).into_future(), Err(e) => err((e, framed)),
Ok(_) => Ok((req, framed)).into_future(), Ok(_) => ok((req, framed)),
} }
} }
} }
@ -67,9 +69,9 @@ where
} }
} }
impl<T, R, E, C> NewService for SendError<T, R, E, C> impl<T, R, E, C> ServiceFactory for SendError<T, R, E, C>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
R: 'static, R: 'static,
E: ResponseError + 'static, E: ResponseError + 'static,
{ {
@ -79,7 +81,7 @@ where
type Error = (E, Framed<T, Codec>); type Error = (E, Framed<T, Codec>);
type InitError = (); type InitError = ();
type Service = SendError<T, R, E, C>; type Service = SendError<T, R, E, C>;
type Future = FutureResult<Self::Service, Self::InitError>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &C) -> Self::Future { fn new_service(&self, _: &C) -> Self::Future {
ok(SendError(PhantomData)) ok(SendError(PhantomData))
@ -88,25 +90,25 @@ where
impl<T, R, E, C> Service for SendError<T, R, E, C> impl<T, R, E, C> Service for SendError<T, R, E, C>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
R: 'static, R: 'static,
E: ResponseError + 'static, E: ResponseError + 'static,
{ {
type Request = Result<R, (E, Framed<T, Codec>)>; type Request = Result<R, (E, Framed<T, Codec>)>;
type Response = R; type Response = R;
type Error = (E, Framed<T, Codec>); type Error = (E, Framed<T, Codec>);
type Future = Either<FutureResult<R, (E, Framed<T, Codec>)>, SendErrorFut<T, R, E>>; type Future = Either<Ready<Result<R, (E, Framed<T, Codec>)>>, SendErrorFut<T, R, E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, req: Result<R, (E, Framed<T, Codec>)>) -> Self::Future { fn call(&mut self, req: Result<R, (E, Framed<T, Codec>)>) -> Self::Future {
match req { match req {
Ok(r) => Either::A(ok(r)), Ok(r) => Either::Left(ok(r)),
Err((e, framed)) => { Err((e, framed)) => {
let res = e.error_response().drop_body(); let res = e.error_response().drop_body();
Either::B(SendErrorFut { Either::Right(SendErrorFut {
framed: Some(framed), framed: Some(framed),
res: Some((res, BodySize::Empty).into()), res: Some((res, BodySize::Empty).into()),
err: Some(e), err: Some(e),
@ -117,6 +119,7 @@ where
} }
} }
#[pin_project::pin_project]
pub struct SendErrorFut<T, R, E> { pub struct SendErrorFut<T, R, E> {
res: Option<Message<(Response<()>, BodySize)>>, res: Option<Message<(Response<()>, BodySize)>>,
framed: Option<Framed<T, Codec>>, framed: Option<Framed<T, Codec>>,
@ -127,23 +130,27 @@ pub struct SendErrorFut<T, R, E> {
impl<T, R, E> Future for SendErrorFut<T, R, E> impl<T, R, E> Future for SendErrorFut<T, R, E>
where where
E: ResponseError, E: ResponseError,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite + Unpin,
{ {
type Item = R; type Output = Result<R, (E, Framed<T, Codec>)>;
type Error = (E, Framed<T, Codec>);
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Some(res) = self.res.take() { if let Some(res) = self.res.take() {
if self.framed.as_mut().unwrap().force_send(res).is_err() { if self.framed.as_mut().unwrap().write(res).is_err() {
return Err((self.err.take().unwrap(), self.framed.take().unwrap())); return Poll::Ready(Err((
self.err.take().unwrap(),
self.framed.take().unwrap(),
)));
} }
} }
match self.framed.as_mut().unwrap().poll_complete() { match self.framed.as_mut().unwrap().flush(cx) {
Ok(Async::Ready(_)) => { Poll::Ready(Ok(_)) => {
Err((self.err.take().unwrap(), self.framed.take().unwrap())) Poll::Ready(Err((self.err.take().unwrap(), self.framed.take().unwrap())))
} }
Ok(Async::NotReady) => Ok(Async::NotReady), Poll::Ready(Err(_)) => {
Err(_) => Err((self.err.take().unwrap(), self.framed.take().unwrap())), Poll::Ready(Err((self.err.take().unwrap(), self.framed.take().unwrap())))
}
Poll::Pending => Poll::Pending,
} }
} }
} }

View File

@ -1,4 +1,6 @@
//! Various helpers for Actix applications to use during testing. //! Various helpers for Actix applications to use during testing.
use std::future::Future;
use actix_codec::Framed; use actix_codec::Framed;
use actix_http::h1::Codec; use actix_http::h1::Codec;
use actix_http::http::header::{Header, HeaderName, IntoHeaderValue}; use actix_http::http::header::{Header, HeaderName, IntoHeaderValue};
@ -6,7 +8,6 @@ use actix_http::http::{HttpTryFrom, Method, Uri, Version};
use actix_http::test::{TestBuffer, TestRequest as HttpTestRequest}; use actix_http::test::{TestBuffer, TestRequest as HttpTestRequest};
use actix_router::{Path, Url}; use actix_router::{Path, Url};
use actix_rt::Runtime; use actix_rt::Runtime;
use futures::IntoFuture;
use crate::{FramedRequest, State}; use crate::{FramedRequest, State};
@ -121,10 +122,10 @@ impl<S> TestRequest<S> {
pub fn run<F, R, I, E>(self, f: F) -> Result<I, E> pub fn run<F, R, I, E>(self, f: F) -> Result<I, E>
where where
F: FnOnce(FramedRequest<TestBuffer, S>) -> R, F: FnOnce(FramedRequest<TestBuffer, S>) -> R,
R: IntoFuture<Item = I, Error = E>, R: Future<Output = Result<I, E>>,
{ {
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
rt.block_on(f(self.finish()).into_future()) rt.block_on(f(self.finish()))
} }
} }

View File

@ -1,30 +1,31 @@
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_http::{body, http::StatusCode, ws, Error, HttpService, Response}; use actix_http::{body, http::StatusCode, ws, Error, HttpService, Response};
use actix_http_test::TestServer; use actix_http_test::{block_on, TestServer};
use actix_service::{IntoNewService, NewService}; use actix_service::{pipeline_factory, IntoServiceFactory, ServiceFactory};
use actix_utils::framed::FramedTransport; use actix_utils::framed::FramedTransport;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::future::{self, ok}; use futures::{future, SinkExt, StreamExt};
use futures::{Future, Sink, Stream};
use actix_framed::{FramedApp, FramedRequest, FramedRoute, SendError, VerifyWebSockets}; use actix_framed::{FramedApp, FramedRequest, FramedRoute, SendError, VerifyWebSockets};
fn ws_service<T: AsyncRead + AsyncWrite>( async fn ws_service<T: AsyncRead + AsyncWrite>(
req: FramedRequest<T>, req: FramedRequest<T>,
) -> impl Future<Item = (), Error = Error> { ) -> Result<(), Error> {
let (req, framed, _) = req.into_parts(); let (req, mut framed, _) = req.into_parts();
let res = ws::handshake(req.head()).unwrap().message_body(()); let res = ws::handshake(req.head()).unwrap().message_body(());
framed framed
.send((res, body::BodySize::None).into()) .send((res, body::BodySize::None).into())
.map_err(|_| panic!()) .await
.and_then(|framed| { .unwrap();
FramedTransport::new(framed.into_framed(ws::Codec::new()), service) FramedTransport::new(framed.into_framed(ws::Codec::new()), service)
.map_err(|_| panic!()) .await
}) .unwrap();
Ok(())
} }
fn service(msg: ws::Frame) -> impl Future<Item = ws::Message, Error = Error> { async fn service(msg: ws::Frame) -> Result<ws::Message, Error> {
let msg = match msg { let msg = match msg {
ws::Frame::Ping(msg) => ws::Message::Pong(msg), ws::Frame::Ping(msg) => ws::Message::Pong(msg),
ws::Frame::Text(text) => { ws::Frame::Text(text) => {
@ -34,108 +35,129 @@ fn service(msg: ws::Frame) -> impl Future<Item = ws::Message, Error = Error> {
ws::Frame::Close(reason) => ws::Message::Close(reason), ws::Frame::Close(reason) => ws::Message::Close(reason),
_ => panic!(), _ => panic!(),
}; };
ok(msg) Ok(msg)
} }
#[test] #[test]
fn test_simple() { fn test_simple() {
let mut srv = TestServer::new(|| { block_on(async {
let mut srv = TestServer::start(|| {
HttpService::build() HttpService::build()
.upgrade( .upgrade(
FramedApp::new().service(FramedRoute::get("/index.html").to(ws_service)), FramedApp::new()
.service(FramedRoute::get("/index.html").to(ws_service)),
) )
.finish(|_| future::ok::<_, Error>(Response::NotFound())) .finish(|_| future::ok::<_, Error>(Response::NotFound()))
}); });
assert!(srv.ws_at("/test").is_err()); assert!(srv.ws_at("/test").await.is_err());
// client service // client service
let framed = srv.ws_at("/index.html").unwrap(); let mut framed = srv.ws_at("/index.html").await.unwrap();
let framed = srv framed
.block_on(framed.send(ws::Message::Text("text".to_string()))) .send(ws::Message::Text("text".to_string()))
.await
.unwrap(); .unwrap();
let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); let (item, mut framed) = framed.into_future().await;
assert_eq!(item, Some(ws::Frame::Text(Some(BytesMut::from("text")))));
let framed = srv
.block_on(framed.send(ws::Message::Binary("text".into())))
.unwrap();
let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap();
assert_eq!( assert_eq!(
item, item.unwrap().unwrap(),
Some(ws::Frame::Binary(Some(Bytes::from_static(b"text").into()))) ws::Frame::Text(Some(BytesMut::from("text")))
); );
let framed = srv framed
.block_on(framed.send(ws::Message::Ping("text".into()))) .send(ws::Message::Binary("text".into()))
.await
.unwrap(); .unwrap();
let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); let (item, mut framed) = framed.into_future().await;
assert_eq!(item, Some(ws::Frame::Pong("text".to_string().into())));
let framed = srv
.block_on(framed.send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))))
.unwrap();
let (item, _framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap();
assert_eq!( assert_eq!(
item, item.unwrap().unwrap(),
Some(ws::Frame::Close(Some(ws::CloseCode::Normal.into()))) ws::Frame::Binary(Some(Bytes::from_static(b"text").into()))
); );
framed.send(ws::Message::Ping("text".into())).await.unwrap();
let (item, mut framed) = framed.into_future().await;
assert_eq!(
item.unwrap().unwrap(),
ws::Frame::Pong("text".to_string().into())
);
framed
.send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))
.await
.unwrap();
let (item, _) = framed.into_future().await;
assert_eq!(
item.unwrap().unwrap(),
ws::Frame::Close(Some(ws::CloseCode::Normal.into()))
);
})
} }
#[test] #[test]
fn test_service() { fn test_service() {
let mut srv = TestServer::new(|| { block_on(async {
actix_http::h1::OneRequest::new().map_err(|_| ()).and_then( let mut srv = TestServer::start(|| {
VerifyWebSockets::default() pipeline_factory(actix_http::h1::OneRequest::new().map_err(|_| ())).and_then(
pipeline_factory(
pipeline_factory(VerifyWebSockets::default())
.then(SendError::default()) .then(SendError::default())
.map_err(|_| ()) .map_err(|_| ()),
)
.and_then( .and_then(
FramedApp::new() FramedApp::new()
.service(FramedRoute::get("/index.html").to(ws_service)) .service(FramedRoute::get("/index.html").to(ws_service))
.into_new_service() .into_factory()
.map_err(|_| ()), .map_err(|_| ()),
), ),
) )
}); });
// non ws request // non ws request
let res = srv.block_on(srv.get("/index.html").send()).unwrap(); let res = srv.get("/index.html").send().await.unwrap();
assert_eq!(res.status(), StatusCode::BAD_REQUEST); assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// not found // not found
assert!(srv.ws_at("/test").is_err()); assert!(srv.ws_at("/test").await.is_err());
// client service // client service
let framed = srv.ws_at("/index.html").unwrap(); let mut framed = srv.ws_at("/index.html").await.unwrap();
let framed = srv framed
.block_on(framed.send(ws::Message::Text("text".to_string()))) .send(ws::Message::Text("text".to_string()))
.await
.unwrap(); .unwrap();
let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); let (item, mut framed) = framed.into_future().await;
assert_eq!(item, Some(ws::Frame::Text(Some(BytesMut::from("text")))));
let framed = srv
.block_on(framed.send(ws::Message::Binary("text".into())))
.unwrap();
let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap();
assert_eq!( assert_eq!(
item, item.unwrap().unwrap(),
Some(ws::Frame::Binary(Some(Bytes::from_static(b"text").into()))) ws::Frame::Text(Some(BytesMut::from("text")))
); );
let framed = srv framed
.block_on(framed.send(ws::Message::Ping("text".into()))) .send(ws::Message::Binary("text".into()))
.await
.unwrap(); .unwrap();
let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); let (item, mut framed) = framed.into_future().await;
assert_eq!(item, Some(ws::Frame::Pong("text".to_string().into())));
let framed = srv
.block_on(framed.send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))))
.unwrap();
let (item, _framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap();
assert_eq!( assert_eq!(
item, item.unwrap().unwrap(),
Some(ws::Frame::Close(Some(ws::CloseCode::Normal.into()))) ws::Frame::Binary(Some(Bytes::from_static(b"text").into()))
); );
framed.send(ws::Message::Ping("text".into())).await.unwrap();
let (item, mut framed) = framed.into_future().await;
assert_eq!(
item.unwrap().unwrap(),
ws::Frame::Pong("text".to_string().into())
);
framed
.send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))
.await
.unwrap();
let (item, _) = framed.into_future().await;
assert_eq!(
item.unwrap().unwrap(),
ws::Frame::Close(Some(ws::CloseCode::Normal.into()))
);
})
} }