1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-06-27 18:49:01 +02:00

add actix-framed

This commit is contained in:
Nikolay Kim
2019-04-10 15:06:27 -07:00
parent 52aebb3bca
commit 8dc4a88aa6
11 changed files with 909 additions and 0 deletions

215
actix-framed/src/app.rs Normal file
View File

@ -0,0 +1,215 @@
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::h1::{Codec, SendResponse};
use actix_http::{Error, Request, Response};
use actix_router::{Path, Router, Url};
use actix_service::{IntoNewService, NewService, Service};
use actix_utils::cloneable::CloneableService;
use futures::{Async, Future, Poll};
use crate::helpers::{BoxedHttpNewService, BoxedHttpService, HttpNewService};
use crate::request::FramedRequest;
use crate::state::State;
type BoxedResponse = Box<Future<Item = (), Error = Error>>;
pub trait HttpServiceFactory {
type Factory: NewService;
fn path(&self) -> &str;
fn create(self) -> Self::Factory;
}
/// Application builder
pub struct App<T, S = ()> {
state: State<S>,
services: Vec<(String, BoxedHttpNewService<FramedRequest<T, S>>)>,
}
impl<T: 'static> App<T, ()> {
pub fn new() -> Self {
App {
state: State::new(()),
services: Vec::new(),
}
}
}
impl<T: 'static, S: 'static> App<T, S> {
pub fn with(state: S) -> App<T, S> {
App {
services: Vec::new(),
state: State::new(state),
}
}
pub fn service<U>(mut self, factory: U) -> Self
where
U: HttpServiceFactory,
U::Factory: NewService<
Request = FramedRequest<T, S>,
Response = (),
Error = Error,
InitError = (),
> + 'static,
<U::Factory as NewService>::Future: 'static,
<U::Factory as NewService>::Service: Service<
Request = FramedRequest<T, S>,
Response = (),
Error = Error,
Future = Box<Future<Item = (), Error = Error>>,
>,
{
let path = factory.path().to_string();
self.services
.push((path, Box::new(HttpNewService::new(factory.create()))));
self
}
}
impl<T, S> IntoNewService<AppFactory<T, S>> for App<T, S>
where
T: AsyncRead + AsyncWrite + 'static,
S: 'static,
{
fn into_new_service(self) -> AppFactory<T, S> {
AppFactory {
state: self.state,
services: Rc::new(self.services),
}
}
}
#[derive(Clone)]
pub struct AppFactory<T, S> {
state: State<S>,
services: Rc<Vec<(String, BoxedHttpNewService<FramedRequest<T, S>>)>>,
}
impl<T, S> NewService for AppFactory<T, S>
where
T: AsyncRead + AsyncWrite + 'static,
S: 'static,
{
type Request = (Request, Framed<T, Codec>);
type Response = ();
type Error = Error;
type InitError = ();
type Service = CloneableService<AppService<T, S>>;
type Future = CreateService<T, S>;
fn new_service(&self, _: &()) -> Self::Future {
CreateService {
fut: self
.services
.iter()
.map(|(path, service)| {
CreateServiceItem::Future(
Some(path.clone()),
service.new_service(&()),
)
})
.collect(),
state: self.state.clone(),
}
}
}
#[doc(hidden)]
pub struct CreateService<T, S> {
fut: Vec<CreateServiceItem<T, S>>,
state: State<S>,
}
enum CreateServiceItem<T, S> {
Future(
Option<String>,
Box<Future<Item = BoxedHttpService<FramedRequest<T, S>>, Error = ()>>,
),
Service(String, BoxedHttpService<FramedRequest<T, S>>),
}
impl<S: 'static, T: 'static> Future for CreateService<T, S>
where
T: AsyncRead + AsyncWrite,
{
type Item = CloneableService<AppService<T, S>>;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut done = true;
// poll http services
for item in &mut self.fut {
let res = match item {
CreateServiceItem::Future(ref mut path, ref mut fut) => {
match fut.poll()? {
Async::Ready(service) => Some((path.take().unwrap(), service)),
Async::NotReady => {
done = false;
None
}
}
}
CreateServiceItem::Service(_, _) => continue,
};
if let Some((path, service)) = res {
*item = CreateServiceItem::Service(path, service);
}
}
if done {
let router = self
.fut
.drain(..)
.fold(Router::build(), |mut router, item| {
match item {
CreateServiceItem::Service(path, service) => {
router.path(&path, service);
}
CreateServiceItem::Future(_, _) => unreachable!(),
}
router
});
Ok(Async::Ready(CloneableService::new(AppService {
router: router.finish(),
state: self.state.clone(),
})))
} else {
Ok(Async::NotReady)
}
}
}
pub struct AppService<T, S> {
state: State<S>,
router: Router<BoxedHttpService<FramedRequest<T, S>>>,
}
impl<S: 'static, T: 'static> Service for AppService<T, S>
where
T: AsyncRead + AsyncWrite,
{
type Request = (Request, Framed<T, Codec>);
type Response = ();
type Error = Error;
type Future = BoxedResponse;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, (req, framed): (Request, Framed<T, Codec>)) -> Self::Future {
let mut path = Path::new(Url::new(req.uri().clone()));
if let Some((srv, _info)) = self.router.recognize_mut(&mut path) {
return srv.call(FramedRequest::new(req, framed, self.state.clone()));
}
Box::new(
SendResponse::new(framed, Response::NotFound().finish()).then(|_| Ok(())),
)
}
}

View File

@ -0,0 +1,88 @@
use actix_http::Error;
use actix_service::{NewService, Service};
use futures::{Future, Poll};
pub(crate) type BoxedHttpService<Req> = Box<
Service<
Request = Req,
Response = (),
Error = Error,
Future = Box<Future<Item = (), Error = Error>>,
>,
>;
pub(crate) type BoxedHttpNewService<Req> = Box<
NewService<
Request = Req,
Response = (),
Error = Error,
InitError = (),
Service = BoxedHttpService<Req>,
Future = Box<Future<Item = BoxedHttpService<Req>, Error = ()>>,
>,
>;
pub(crate) struct HttpNewService<T: NewService>(T);
impl<T> HttpNewService<T>
where
T: NewService<Response = (), Error = Error>,
T::Response: 'static,
T::Future: 'static,
T::Service: Service<Future = Box<Future<Item = (), Error = Error>>> + 'static,
<T::Service as Service>::Future: 'static,
{
pub fn new(service: T) -> Self {
HttpNewService(service)
}
}
impl<T> NewService for HttpNewService<T>
where
T: NewService<Response = (), Error = Error>,
T::Request: 'static,
T::Future: 'static,
T::Service: Service<Future = Box<Future<Item = (), Error = Error>>> + 'static,
<T::Service as Service>::Future: 'static,
{
type Request = T::Request;
type Response = ();
type Error = Error;
type InitError = ();
type Service = BoxedHttpService<T::Request>;
type Future = Box<Future<Item = Self::Service, Error = ()>>;
fn new_service(&self, _: &()) -> Self::Future {
Box::new(self.0.new_service(&()).map_err(|_| ()).and_then(|service| {
let service: BoxedHttpService<_> = Box::new(HttpServiceWrapper { service });
Ok(service)
}))
}
}
struct HttpServiceWrapper<T: Service> {
service: T,
}
impl<T> Service for HttpServiceWrapper<T>
where
T: Service<
Response = (),
Future = Box<Future<Item = (), Error = Error>>,
Error = Error,
>,
T::Request: 'static,
{
type Request = T::Request;
type Response = ();
type Error = Error;
type Future = Box<Future<Item = (), Error = Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
}
fn call(&mut self, req: Self::Request) -> Self::Future {
self.service.call(req)
}
}

13
actix-framed/src/lib.rs Normal file
View File

@ -0,0 +1,13 @@
mod app;
mod helpers;
mod request;
mod route;
mod state;
// re-export for convinience
pub use actix_http::{http, Error, HttpMessage, Response, ResponseError};
pub use self::app::{App, AppService};
pub use self::request::FramedRequest;
pub use self::route::FramedRoute;
pub use self::state::State;

View File

@ -0,0 +1,30 @@
use actix_codec::Framed;
use actix_http::{h1::Codec, Request};
use crate::state::State;
pub struct FramedRequest<Io, S = ()> {
req: Request,
framed: Framed<Io, Codec>,
state: State<S>,
}
impl<Io, S> FramedRequest<Io, S> {
pub fn new(req: Request, framed: Framed<Io, Codec>, state: State<S>) -> Self {
Self { req, framed, state }
}
}
impl<Io, S> FramedRequest<Io, S> {
pub fn request(&self) -> &Request {
&self.req
}
pub fn request_mut(&mut self) -> &mut Request {
&mut self.req
}
pub fn into_parts(self) -> (Request, Framed<Io, Codec>, State<S>) {
(self.req, self.framed, self.state)
}
}

189
actix-framed/src/route.rs Normal file
View File

@ -0,0 +1,189 @@
use std::fmt;
use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite};
use actix_http::{http::Method, Error};
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, IntoFuture, Poll};
use log::error;
use crate::app::HttpServiceFactory;
use crate::request::FramedRequest;
/// Resource route definition
///
/// Route uses builder-like pattern for configuration.
/// If handler is not explicitly set, default *404 Not Found* handler is used.
pub struct FramedRoute<Io, S, F, R> {
handler: F,
pattern: String,
methods: Vec<Method>,
state: PhantomData<(Io, S, R)>,
}
impl<Io, S> FramedRoute<Io, S, (), ()> {
pub fn build(path: &str) -> FramedRouteBuilder<Io, S> {
FramedRouteBuilder::new(path)
}
pub fn get(path: &str) -> FramedRouteBuilder<Io, S> {
FramedRouteBuilder::new(path).method(Method::GET)
}
pub fn post(path: &str) -> FramedRouteBuilder<Io, S> {
FramedRouteBuilder::new(path).method(Method::POST)
}
pub fn put(path: &str) -> FramedRouteBuilder<Io, S> {
FramedRouteBuilder::new(path).method(Method::PUT)
}
pub fn delete(path: &str) -> FramedRouteBuilder<Io, S> {
FramedRouteBuilder::new(path).method(Method::DELETE)
}
}
impl<Io, S, F, R> FramedRoute<Io, S, F, R>
where
F: FnMut(FramedRequest<Io, S>) -> R + Clone,
R: IntoFuture<Item = ()>,
R::Future: 'static,
R::Error: fmt::Display,
{
pub fn new(pattern: &str, handler: F) -> Self {
FramedRoute {
handler,
pattern: pattern.to_string(),
methods: Vec::new(),
state: PhantomData,
}
}
pub fn method(mut self, method: Method) -> Self {
self.methods.push(method);
self
}
}
impl<Io, S, F, R> HttpServiceFactory for FramedRoute<Io, S, F, R>
where
Io: AsyncRead + AsyncWrite + 'static,
F: FnMut(FramedRequest<Io, S>) -> R + Clone,
R: IntoFuture<Item = ()>,
R::Future: 'static,
R::Error: fmt::Display,
{
type Factory = FramedRouteFactory<Io, S, F, R>;
fn path(&self) -> &str {
&self.pattern
}
fn create(self) -> Self::Factory {
FramedRouteFactory {
handler: self.handler,
methods: self.methods,
_t: PhantomData,
}
}
}
pub struct FramedRouteFactory<Io, S, F, R> {
handler: F,
methods: Vec<Method>,
_t: PhantomData<(Io, S, R)>,
}
impl<Io, S, F, R> NewService for FramedRouteFactory<Io, S, F, R>
where
Io: AsyncRead + AsyncWrite + 'static,
F: FnMut(FramedRequest<Io, S>) -> R + Clone,
R: IntoFuture<Item = ()>,
R::Future: 'static,
R::Error: fmt::Display,
{
type Request = FramedRequest<Io, S>;
type Response = ();
type Error = Error;
type InitError = ();
type Service = FramedRouteService<Io, S, F, R>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(FramedRouteService {
handler: self.handler.clone(),
methods: self.methods.clone(),
_t: PhantomData,
})
}
}
pub struct FramedRouteService<Io, S, F, R> {
handler: F,
methods: Vec<Method>,
_t: PhantomData<(Io, S, R)>,
}
impl<Io, S, F, R> Service for FramedRouteService<Io, S, F, R>
where
Io: AsyncRead + AsyncWrite + 'static,
F: FnMut(FramedRequest<Io, S>) -> R + Clone,
R: IntoFuture<Item = ()>,
R::Future: 'static,
R::Error: fmt::Display,
{
type Request = FramedRequest<Io, S>;
type Response = ();
type Error = Error;
type Future = Box<Future<Item = (), Error = Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: FramedRequest<Io, S>) -> Self::Future {
Box::new((self.handler)(req).into_future().then(|res| {
if let Err(e) = res {
error!("Error in request handler: {}", e);
}
Ok(())
}))
}
}
pub struct FramedRouteBuilder<Io, S> {
pattern: String,
methods: Vec<Method>,
state: PhantomData<(Io, S)>,
}
impl<Io, S> FramedRouteBuilder<Io, S> {
fn new(path: &str) -> FramedRouteBuilder<Io, S> {
FramedRouteBuilder {
pattern: path.to_string(),
methods: Vec::new(),
state: PhantomData,
}
}
pub fn method(mut self, method: Method) -> Self {
self.methods.push(method);
self
}
pub fn to<F, R>(self, handler: F) -> FramedRoute<Io, S, F, R>
where
F: FnMut(FramedRequest<Io, S>) -> R,
R: IntoFuture<Item = ()>,
R::Future: 'static,
R::Error: fmt::Debug,
{
FramedRoute {
handler,
pattern: self.pattern,
methods: self.methods,
state: PhantomData,
}
}
}

29
actix-framed/src/state.rs Normal file
View File

@ -0,0 +1,29 @@
use std::ops::Deref;
use std::sync::Arc;
/// Application state
pub struct State<S>(Arc<S>);
impl<S> State<S> {
pub fn new(state: S) -> State<S> {
State(Arc::new(state))
}
pub fn get_ref(&self) -> &S {
self.0.as_ref()
}
}
impl<S> Deref for State<S> {
type Target = S;
fn deref(&self) -> &S {
self.0.as_ref()
}
}
impl<S> Clone for State<S> {
fn clone(&self) -> State<S> {
State(self.0.clone())
}
}