diff --git a/examples/basic.rs b/examples/basic.rs new file mode 100644 index 00000000..fd21320c --- /dev/null +++ b/examples/basic.rs @@ -0,0 +1,95 @@ +//! simple composite service +//! to test: curl https://127.0.0.1:8443/ -k +extern crate actix; +extern crate actix_net; +extern crate futures; +extern crate openssl; +extern crate tokio_io; +extern crate tokio_openssl; +extern crate tokio_tcp; + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use std::{fmt, io}; + +use futures::{future, Future}; +use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_openssl::SslAcceptorExt; + +use actix_net::service::{IntoNewService, NewServiceExt}; +use actix_net::Server; + +/// Simple logger service, it just prints fact of the new connections +fn logger( + stream: T, +) -> impl Future { + println!("New connection: {:?}", stream); + future::ok(stream) +} + +/// Stateful service, counts number of connections, `ServiceState` is a state +/// for the service +#[derive(Debug)] +struct ServiceState { + num: Arc, +} + +/// Service function for our stateful service +fn service( + st: &mut ServiceState, _stream: T, +) -> impl Future { + let num = st.num.fetch_add(1, Ordering::Relaxed); + println!("got ssl connection {:?}", num); + future::ok(()) +} + +fn main() { + let sys = actix::System::new("test"); + + // load ssl keys + let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + builder + .set_private_key_file("./examples/key.pem", SslFiletype::PEM) + .unwrap(); + builder + .set_certificate_chain_file("./examples/cert.pem") + .unwrap(); + let acceptor = builder.build(); + + let num = Arc::new(AtomicUsize::new(0)); + + // configure service pipeline + let srv = + // service for converting incoming TcpStream to a SslStream + (move |stream| { + SslAcceptorExt::accept_async(&acceptor, stream) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + }) + // convert closure to a `NewService` and convert init error + .into_new_service().map_init_err(|_| io::Error::new(io::ErrorKind::Other, "")) + + // .and_then() combinator uses other service to convert incoming `Request` to a `Response` + // and then uses that response as an input for next service. + // in this case, on success we use `logger` service + .and_then( + // convert function to a Service and related NewService impl + logger.into_new_service() + // if service is a function actix-net generate NewService impl with InitError=(), + // but actix_net::Server requires io::Error as InitError, + // we need to convert it to a `io::Error` + .map_init_err(|_| io::Error::new(io::ErrorKind::Other, ""))) + + // next service uses two components, service state and service function + // actix-net generates `NewService` impl that creates `ServiceState` instance for each new service + // and use `service` function as `Service::call` + .and_then((service, move || { + Ok::<_, io::Error>(ServiceState { num: num.clone() }) + })); + + Server::new().bind("0.0.0.0:8443", srv).unwrap().start(); + + sys.run(); +} diff --git a/src/server.rs b/src/server.rs index e92ac017..2761a3b5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -164,7 +164,8 @@ impl Server { N::Error: fmt::Display, { let token = Token(self.services.len()); - self.services.push(ServerNewService::create(srv.into())); + self.services + .push(ServerNewService::create(srv.into_new_service())); self.sockets.push((token, lst)); self } diff --git a/src/service.rs b/src/service.rs index 3e65a8e6..5af6b3d4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -74,7 +74,7 @@ where T: NewService, { /// Create service - fn into(self) -> T; + fn into_new_service(self) -> T; } impl IntoService for T @@ -90,7 +90,7 @@ impl IntoNewService for T where T: NewService, { - fn into(self) -> T { + fn into_new_service(self) -> T { self } } @@ -198,7 +198,7 @@ where F: Fn(Req) -> Fut + Clone + 'static, Fut: IntoFuture, { - fn into(self) -> FnNewService { + fn into_new_service(self) -> FnNewService { FnNewService::new(self) } } @@ -334,7 +334,9 @@ where Err1: 'static, Err2: 'static, { - fn into(self) -> FnStateNewService { + fn into_new_service( + self, + ) -> FnStateNewService { FnStateNewService::new(self.0, self.1) } } @@ -456,7 +458,10 @@ where { /// Create new `AndThen` combinator pub fn new>(a: A, f: F) -> Self { - Self { a, b: f.into() } + Self { + a, + b: f.into_new_service(), + } } } @@ -640,6 +645,20 @@ where } } +impl Clone for MapErrNewService +where + A: NewService + Clone, + F: Fn(A::InitError) -> E + Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + f: self.f.clone(), + e: marker::PhantomData, + } + } +} + impl NewService for MapErrNewService where A: NewService + Clone, @@ -658,20 +677,6 @@ where } } -impl Clone for MapErrNewService -where - A: NewService + Clone, - F: Fn(A::Error) -> E + Clone, -{ - fn clone(&self) -> Self { - Self { - a: self.a.clone(), - f: self.f.clone(), - e: marker::PhantomData, - } - } -} - pub struct MapErrNewServiceFuture where A: NewService, @@ -730,6 +735,20 @@ where } } +impl Clone for MapInitErr +where + A: NewService + Clone, + F: Fn(A::InitError) -> E + Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + f: self.f.clone(), + e: marker::PhantomData, + } + } +} + impl NewService for MapInitErr where A: NewService,