diff --git a/Cargo.toml b/Cargo.toml index 536806316..e17b72838 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ flate2-rust = ["flate2/rust_backend"] [dependencies] actix = "0.7.0" actix-net = { git="https://github.com/actix/actix-net.git" } +#actix-net = { path = "../actix-net" } base64 = "0.9" bitflags = "1.0" diff --git a/src/middleware/cors.rs b/src/middleware/cors.rs index f1adf0c4b..953f2911c 100644 --- a/src/middleware/cors.rs +++ b/src/middleware/cors.rs @@ -1127,12 +1127,23 @@ mod tests { let resp: HttpResponse = HttpResponse::Ok().into(); let resp = cors.response(&req, resp).unwrap().response(); - let origins_str = resp.headers().get(header::ACCESS_CONTROL_ALLOW_ORIGIN).unwrap().to_str().unwrap(); + let origins_str = resp + .headers() + .get(header::ACCESS_CONTROL_ALLOW_ORIGIN) + .unwrap() + .to_str() + .unwrap(); if origins_str.starts_with("https://www.example.com") { - assert_eq!("https://www.example.com, https://www.google.com", origins_str); + assert_eq!( + "https://www.example.com, https://www.google.com", + origins_str + ); } else { - assert_eq!("https://www.google.com, https://www.example.com", origins_str); + assert_eq!( + "https://www.google.com, https://www.example.com", + origins_str + ); } } diff --git a/src/payload.rs b/src/payload.rs index 382c0b0f5..2131e3c3c 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -1,8 +1,8 @@ //! Payload stream use bytes::{Bytes, BytesMut}; -use futures::task::Task; #[cfg(not(test))] use futures::task::current as current_task; +use futures::task::Task; use futures::{Async, Poll, Stream}; use std::cell::RefCell; use std::cmp; diff --git a/src/server/builder.rs b/src/server/builder.rs new file mode 100644 index 000000000..4a77bcd5c --- /dev/null +++ b/src/server/builder.rs @@ -0,0 +1,257 @@ +use std::marker::PhantomData; +use std::net; + +use actix_net::server; +use actix_net::service::{NewService, NewServiceExt, Service}; +use futures::future::{ok, FutureResult}; +use futures::{Async, Poll}; +use tokio_tcp::TcpStream; + +use super::handler::IntoHttpHandler; +use super::service::HttpService; +use super::{IoStream, KeepAlive}; + +pub(crate) trait ServiceFactory +where + H: IntoHttpHandler, +{ + fn register(&self, server: server::Server, lst: net::TcpListener) -> server::Server; +} + +pub struct HttpServiceBuilder +where + F: Fn() -> H + Send + Clone, +{ + factory: F, + acceptor: A, + pipeline: P, +} + +impl HttpServiceBuilder +where + F: Fn() -> H + Send + Clone, + H: IntoHttpHandler, + A: AcceptorServiceFactory, + P: HttpPipelineFactory, +{ + pub fn new(factory: F, acceptor: A, pipeline: P) -> Self { + Self { + factory, + pipeline, + acceptor, + } + } + + pub fn acceptor(self, acceptor: A1) -> HttpServiceBuilder + where + A1: AcceptorServiceFactory, + { + HttpServiceBuilder { + acceptor, + pipeline: self.pipeline, + factory: self.factory.clone(), + } + } + + pub fn pipeline(self, pipeline: P1) -> HttpServiceBuilder + where + P1: HttpPipelineFactory, + { + HttpServiceBuilder { + pipeline, + acceptor: self.acceptor, + factory: self.factory.clone(), + } + } + + fn finish(&self) -> impl server::StreamServiceFactory { + let pipeline = self.pipeline.clone(); + let acceptor = self.acceptor.clone(); + move || acceptor.create().and_then(pipeline.create()) + } +} + +impl Clone for HttpServiceBuilder +where + F: Fn() -> H + Send + Clone, + A: AcceptorServiceFactory, + P: HttpPipelineFactory, +{ + fn clone(&self) -> Self { + HttpServiceBuilder { + factory: self.factory.clone(), + acceptor: self.acceptor.clone(), + pipeline: self.pipeline.clone(), + } + } +} + +impl ServiceFactory for HttpServiceBuilder +where + F: Fn() -> H + Send + Clone, + A: AcceptorServiceFactory, + P: HttpPipelineFactory, + H: IntoHttpHandler, +{ + fn register(&self, server: server::Server, lst: net::TcpListener) -> server::Server { + server.listen("actix-web", lst, self.finish()) + } +} + +pub trait AcceptorServiceFactory: Send + Clone + 'static { + type Io: IoStream + Send; + type NewService: NewService< + Request = TcpStream, + Response = Self::Io, + Error = (), + InitError = (), + >; + + fn create(&self) -> Self::NewService; +} + +impl AcceptorServiceFactory for F +where + F: Fn() -> T + Send + Clone + 'static, + T::Response: IoStream + Send, + T: NewService, +{ + type Io = T::Response; + type NewService = T; + + fn create(&self) -> T { + (self)() + } +} + +pub trait HttpPipelineFactory: Send + Clone + 'static { + type Io: IoStream; + type NewService: NewService< + Request = Self::Io, + Response = (), + Error = (), + InitError = (), + >; + + fn create(&self) -> Self::NewService; +} + +impl HttpPipelineFactory for F +where + F: Fn() -> T + Send + Clone + 'static, + T: NewService, + T::Request: IoStream, +{ + type Io = T::Request; + type NewService = T; + + fn create(&self) -> T { + (self)() + } +} + +pub(crate) struct DefaultPipelineFactory +where + F: Fn() -> H + Send + Clone, +{ + factory: F, + host: Option, + addr: net::SocketAddr, + keep_alive: KeepAlive, + _t: PhantomData, +} + +impl DefaultPipelineFactory +where + Io: IoStream + Send, + F: Fn() -> H + Send + Clone + 'static, + H: IntoHttpHandler + 'static, +{ + pub fn new( + factory: F, host: Option, addr: net::SocketAddr, keep_alive: KeepAlive, + ) -> Self { + Self { + factory, + addr, + keep_alive, + host, + _t: PhantomData, + } + } +} + +impl Clone for DefaultPipelineFactory +where + Io: IoStream, + F: Fn() -> H + Send + Clone, + H: IntoHttpHandler, +{ + fn clone(&self) -> Self { + Self { + factory: self.factory.clone(), + addr: self.addr, + keep_alive: self.keep_alive, + host: self.host.clone(), + _t: PhantomData, + } + } +} + +impl HttpPipelineFactory for DefaultPipelineFactory +where + Io: IoStream + Send, + F: Fn() -> H + Send + Clone + 'static, + H: IntoHttpHandler + 'static, +{ + type Io = Io; + type NewService = HttpService; + + fn create(&self) -> Self::NewService { + HttpService::new( + self.factory.clone(), + self.addr, + self.host.clone(), + self.keep_alive, + ) + } +} + +#[derive(Clone)] +pub(crate) struct DefaultAcceptor; + +impl AcceptorServiceFactory for DefaultAcceptor { + type Io = TcpStream; + type NewService = DefaultAcceptor; + + fn create(&self) -> Self::NewService { + DefaultAcceptor + } +} + +impl NewService for DefaultAcceptor { + type Request = TcpStream; + type Response = TcpStream; + type Error = (); + type InitError = (); + type Service = DefaultAcceptor; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + ok(DefaultAcceptor) + } +} + +impl Service for DefaultAcceptor { + type Request = TcpStream; + type Response = TcpStream; + type Error = (); + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + ok(req) + } +} diff --git a/src/server/h1.rs b/src/server/h1.rs index 5ae841bda..36d40e8d3 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -89,8 +89,8 @@ where H: HttpHandler + 'static, { pub fn new( - settings: WorkerSettings, stream: T, addr: Option, - buf: BytesMut, is_eof: bool, keepalive_timer: Option, + settings: WorkerSettings, stream: T, addr: Option, buf: BytesMut, + is_eof: bool, keepalive_timer: Option, ) -> Self { Http1 { flags: if is_eof { @@ -379,10 +379,7 @@ where fn push_response_entry(&mut self, status: StatusCode) { self.tasks.push_back(Entry { - pipe: EntryPipe::Error(ServerError::err( - Version::HTTP_11, - status, - )), + pipe: EntryPipe::Error(ServerError::err(Version::HTTP_11, status)), flags: EntryFlags::empty(), }); } diff --git a/src/server/http.rs b/src/server/http.rs index f67ebe959..f54900fc3 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -1,13 +1,10 @@ -use std::marker::PhantomData; -use std::{io, mem, net, time}; +use std::{io, mem, net}; -use actix::{Actor, Addr, AsyncContext, Context, Handler, System}; -use actix_net::server::{Server, ServerServiceFactory}; -use actix_net::service::{NewService, NewServiceExt, Service}; +use actix::{Addr, System}; +use actix_net::server; +use actix_net::service::NewService; use actix_net::ssl; -use futures::future::{ok, FutureResult}; -use futures::{Async, Poll, Stream}; use net2::TcpBuilder; use num_cpus; use tokio_tcp::TcpStream; @@ -21,9 +18,9 @@ use openssl::ssl::SslAcceptorBuilder; //#[cfg(feature = "rust-tls")] //use rustls::ServerConfig; -use super::channel::HttpChannel; -use super::settings::{ServerSettings, WorkerSettings}; -use super::{HttpHandler, IntoHttpHandler, IoStream, KeepAlive}; +use super::builder::{AcceptorServiceFactory, HttpServiceBuilder, ServiceFactory}; +use super::builder::{DefaultAcceptor, DefaultPipelineFactory}; +use super::{IntoHttpHandler, IoStream, KeepAlive}; struct Socket { scheme: &'static str, @@ -205,17 +202,16 @@ where lst, addr, scheme: "http", - handler: Box::new(SimpleFactory { - addr, - factory: self.factory.clone(), - pipeline: DefaultPipelineFactory { + handler: Box::new(HttpServiceBuilder::new( + self.factory.clone(), + DefaultAcceptor, + DefaultPipelineFactory::new( + self.factory.clone(), + self.host.clone(), addr, - factory: self.factory.clone(), - host: self.host.clone(), - keep_alive: self.keep_alive, - _t: PhantomData, - }, - }), + self.keep_alive, + ), + )), }); self @@ -239,6 +235,7 @@ where addr, scheme: "https", handler: Box::new(HttpServiceBuilder::new( + self.factory.clone(), acceptor, DefaultPipelineFactory::new( self.factory.clone(), @@ -346,6 +343,7 @@ where addr, scheme: "https", handler: Box::new(HttpServiceBuilder::new( + self.factory.clone(), acceptor.clone(), DefaultPipelineFactory::new( self.factory.clone(), @@ -493,10 +491,10 @@ impl H + Send + Clone> HttpServer { /// sys.run(); // <- Run actix system, this method starts all async processes /// } /// ``` - pub fn start(mut self) -> Addr { + pub fn start(mut self) -> Addr { ssl::max_concurrent_ssl_connect(self.maxconnrate); - let mut srv = Server::new() + let mut srv = server::Server::new() .workers(self.threads) .maxconn(self.maxconn) .shutdown_timeout(self.shutdown_timeout); @@ -605,143 +603,6 @@ impl H + Send + Clone> HttpServer { // } // } -struct HttpService -where - F: Fn() -> H, - H: IntoHttpHandler, - Io: IoStream, -{ - factory: F, - addr: net::SocketAddr, - host: Option, - keep_alive: KeepAlive, - _t: PhantomData, -} - -impl NewService for HttpService -where - F: Fn() -> H, - H: IntoHttpHandler, - Io: IoStream, -{ - type Request = Io; - type Response = (); - type Error = (); - type InitError = (); - type Service = HttpServiceHandler; - type Future = FutureResult; - - fn new_service(&self) -> Self::Future { - let s = ServerSettings::new(Some(self.addr), &self.host, false); - let app = (self.factory)().into_handler(); - - ok(HttpServiceHandler::new(app, self.keep_alive, s)) - } -} - -struct HttpServiceHandler -where - H: HttpHandler, - Io: IoStream, -{ - settings: WorkerSettings, - tcp_ka: Option, - _t: PhantomData, -} - -impl HttpServiceHandler -where - H: HttpHandler, - Io: IoStream, -{ - fn new( - app: H, keep_alive: KeepAlive, settings: ServerSettings, - ) -> HttpServiceHandler { - let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { - Some(time::Duration::new(val as u64, 0)) - } else { - None - }; - let settings = WorkerSettings::new(app, keep_alive, settings); - - HttpServiceHandler { - tcp_ka, - settings, - _t: PhantomData, - } - } -} - -impl Service for HttpServiceHandler -where - H: HttpHandler, - Io: IoStream, -{ - type Request = Io; - type Response = (); - type Error = (); - type Future = HttpChannel; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, mut req: Self::Request) -> Self::Future { - let _ = req.set_nodelay(true); - HttpChannel::new(self.settings.clone(), req, None) - } - - // fn shutdown(&self, force: bool) { - // if force { - // self.settings.head().traverse::(); - // } - // } -} - -trait ServiceFactory -where - H: IntoHttpHandler, -{ - fn register(&self, server: Server, lst: net::TcpListener) -> Server; -} - -struct SimpleFactory -where - H: IntoHttpHandler, - F: Fn() -> H + Send + Clone, - P: HttpPipelineFactory, -{ - pub addr: net::SocketAddr, - pub factory: F, - pub pipeline: P, -} - -impl Clone for SimpleFactory -where - P: HttpPipelineFactory, - F: Fn() -> H + Send + Clone, -{ - fn clone(&self) -> Self { - SimpleFactory { - addr: self.addr, - factory: self.factory.clone(), - pipeline: self.pipeline.clone(), - } - } -} - -impl ServiceFactory for SimpleFactory -where - H: IntoHttpHandler + 'static, - F: Fn() -> H + Send + Clone + 'static, - P: HttpPipelineFactory, -{ - fn register(&self, server: Server, lst: net::TcpListener) -> Server { - let pipeline = self.pipeline.clone(); - server.listen(lst, move || pipeline.create()) - } -} - fn create_tcp_listener( addr: net::SocketAddr, backlog: i32, ) -> io::Result { @@ -753,183 +614,3 @@ fn create_tcp_listener( builder.bind(addr)?; Ok(builder.listen(backlog)?) } - -pub struct HttpServiceBuilder { - acceptor: A, - pipeline: P, - t: PhantomData, -} - -impl HttpServiceBuilder -where - A: AcceptorServiceFactory, - P: HttpPipelineFactory, - H: IntoHttpHandler, -{ - pub fn new(acceptor: A, pipeline: P) -> Self { - Self { - acceptor, - pipeline, - t: PhantomData, - } - } - - pub fn acceptor(self, acceptor: A1) -> HttpServiceBuilder - where - A1: AcceptorServiceFactory, - { - HttpServiceBuilder { - acceptor, - pipeline: self.pipeline, - t: PhantomData, - } - } - - pub fn pipeline(self, pipeline: P1) -> HttpServiceBuilder - where - P1: HttpPipelineFactory, - { - HttpServiceBuilder { - pipeline, - acceptor: self.acceptor, - t: PhantomData, - } - } - - fn finish(&self) -> impl ServerServiceFactory { - let acceptor = self.acceptor.clone(); - let pipeline = self.pipeline.clone(); - - move || acceptor.create().and_then(pipeline.create()) - } -} - -impl ServiceFactory for HttpServiceBuilder -where - A: AcceptorServiceFactory, - P: HttpPipelineFactory, - H: IntoHttpHandler, -{ - fn register(&self, server: Server, lst: net::TcpListener) -> Server { - server.listen(lst, self.finish()) - } -} - -pub trait AcceptorServiceFactory: Send + Clone + 'static { - type Io: IoStream + Send; - type NewService: NewService< - Request = TcpStream, - Response = Self::Io, - Error = (), - InitError = (), - >; - - fn create(&self) -> Self::NewService; -} - -impl AcceptorServiceFactory for F -where - F: Fn() -> T + Send + Clone + 'static, - T::Response: IoStream + Send, - T: NewService, -{ - type Io = T::Response; - type NewService = T; - - fn create(&self) -> T { - (self)() - } -} - -pub trait HttpPipelineFactory: Send + Clone + 'static { - type Io: IoStream; - type NewService: NewService< - Request = Self::Io, - Response = (), - Error = (), - InitError = (), - >; - - fn create(&self) -> Self::NewService; -} - -impl HttpPipelineFactory for F -where - F: Fn() -> T + Send + Clone + 'static, - T: NewService, - T::Request: IoStream, -{ - type Io = T::Request; - type NewService = T; - - fn create(&self) -> T { - (self)() - } -} - -struct DefaultPipelineFactory -where - F: Fn() -> H + Send + Clone, -{ - factory: F, - host: Option, - addr: net::SocketAddr, - keep_alive: KeepAlive, - _t: PhantomData, -} - -impl DefaultPipelineFactory -where - Io: IoStream + Send, - F: Fn() -> H + Send + Clone + 'static, - H: IntoHttpHandler + 'static, -{ - fn new( - factory: F, host: Option, addr: net::SocketAddr, keep_alive: KeepAlive, - ) -> Self { - Self { - factory, - addr, - keep_alive, - host, - _t: PhantomData, - } - } -} - -impl Clone for DefaultPipelineFactory -where - Io: IoStream, - F: Fn() -> H + Send + Clone, - H: IntoHttpHandler, -{ - fn clone(&self) -> Self { - Self { - factory: self.factory.clone(), - addr: self.addr, - keep_alive: self.keep_alive, - host: self.host.clone(), - _t: PhantomData, - } - } -} - -impl HttpPipelineFactory for DefaultPipelineFactory -where - Io: IoStream + Send, - F: Fn() -> H + Send + Clone + 'static, - H: IntoHttpHandler + 'static, -{ - type Io = Io; - type NewService = HttpService; - - fn create(&self) -> Self::NewService { - HttpService { - addr: self.addr, - keep_alive: self.keep_alive, - host: self.host.clone(), - factory: self.factory.clone(), - _t: PhantomData, - } - } -} diff --git a/src/server/mod.rs b/src/server/mod.rs index 75f75fcde..ac4ffc9af 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -117,6 +117,7 @@ use tokio_tcp::TcpStream; pub use actix_net::server::{PauseServer, ResumeServer, StopServer}; +pub(crate) mod builder; mod channel; mod error; pub(crate) mod h1; @@ -130,6 +131,7 @@ mod http; pub(crate) mod input; pub(crate) mod message; pub(crate) mod output; +pub(crate) mod service; pub(crate) mod settings; mod ssl; diff --git a/src/server/service.rs b/src/server/service.rs new file mode 100644 index 000000000..6f80cd6df --- /dev/null +++ b/src/server/service.rs @@ -0,0 +1,133 @@ +use std::marker::PhantomData; +use std::net; +use std::time::Duration; + +use actix_net::service::{NewService, Service}; +use futures::future::{ok, FutureResult}; +use futures::{Async, Poll}; + +use super::channel::HttpChannel; +use super::handler::{HttpHandler, IntoHttpHandler}; +use super::settings::{ServerSettings, WorkerSettings}; +use super::{IoStream, KeepAlive}; + +pub enum HttpServiceMessage { + /// New stream + Connect(T), + /// Gracefull shutdown + Shutdown(Duration), + /// Force shutdown + ForceShutdown, +} + +pub(crate) struct HttpService +where + F: Fn() -> H, + H: IntoHttpHandler, + Io: IoStream, +{ + factory: F, + addr: net::SocketAddr, + host: Option, + keep_alive: KeepAlive, + _t: PhantomData, +} + +impl HttpService +where + F: Fn() -> H, + H: IntoHttpHandler, + Io: IoStream, +{ + pub fn new( + factory: F, addr: net::SocketAddr, host: Option, keep_alive: KeepAlive, + ) -> Self { + HttpService { + factory, + addr, + host, + keep_alive, + _t: PhantomData, + } + } +} + +impl NewService for HttpService +where + F: Fn() -> H, + H: IntoHttpHandler, + Io: IoStream, +{ + type Request = Io; + type Response = (); + type Error = (); + type InitError = (); + type Service = HttpServiceHandler; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + let s = ServerSettings::new(Some(self.addr), &self.host, false); + let app = (self.factory)().into_handler(); + + ok(HttpServiceHandler::new(app, self.keep_alive, s)) + } +} + +pub(crate) struct HttpServiceHandler +where + H: HttpHandler, + Io: IoStream, +{ + settings: WorkerSettings, + tcp_ka: Option, + _t: PhantomData, +} + +impl HttpServiceHandler +where + H: HttpHandler, + Io: IoStream, +{ + fn new( + app: H, keep_alive: KeepAlive, settings: ServerSettings, + ) -> HttpServiceHandler { + let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { + Some(Duration::new(val as u64, 0)) + } else { + None + }; + let settings = WorkerSettings::new(app, keep_alive, settings); + + HttpServiceHandler { + tcp_ka, + settings, + _t: PhantomData, + } + } +} + +impl Service for HttpServiceHandler +where + H: HttpHandler, + Io: IoStream, +{ + type Request = Io; + type Response = (); + type Error = (); + type Future = HttpChannel; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, mut req: Self::Request) -> Self::Future { + let _ = req.set_nodelay(true); + HttpChannel::new(self.settings.clone(), req, None) + } + + // fn shutdown(&self, force: bool) { + // if force { + // self.settings.head().traverse::(); + // } + // } +} diff --git a/src/server/settings.rs b/src/server/settings.rs index 6b2fc7270..21ce27195 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -2,7 +2,7 @@ use std::cell::{RefCell, RefMut, UnsafeCell}; use std::collections::VecDeque; use std::fmt::Write; use std::rc::Rc; -use std::time::{Instant, Duration}; +use std::time::{Duration, Instant}; use std::{env, fmt, net}; use bytes::BytesMut; @@ -12,8 +12,8 @@ use http::StatusCode; use lazycell::LazyCell; use parking_lot::Mutex; use time; -use tokio_timer::{sleep, Delay}; use tokio_current_thread::spawn; +use tokio_timer::{sleep, Delay}; use super::channel::Node; use super::message::{Request, RequestPool}; @@ -183,9 +183,7 @@ impl WorkerSettings { pub fn keep_alive_timer(&self) -> Option { let ka = self.0.keep_alive; if ka != 0 { - Some(Delay::new( - Instant::now() + Duration::from_secs(ka), - )) + Some(Delay::new(Instant::now() + Duration::from_secs(ka))) } else { None } diff --git a/tests/test_server.rs b/tests/test_server.rs index 41f4bcf39..c1dbf531d 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -10,9 +10,9 @@ extern crate http as modhttp; extern crate rand; extern crate tokio; extern crate tokio_current_thread; +extern crate tokio_current_thread as current_thread; extern crate tokio_reactor; extern crate tokio_tcp; -extern crate tokio_current_thread as current_thread; use std::io::{Read, Write}; use std::sync::Arc;