diff --git a/Cargo.toml b/Cargo.toml index 15221a84..665b0400 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,3 @@ -[package] -name = "actix-net" -version = "0.3.0" -authors = ["Nikolay Kim "] -description = "Actix net - framework for the compisible network services for Rust (experimental)" -readme = "README.md" -keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-net/" -categories = ["network-programming", "asynchronous"] -license = "MIT/Apache-2.0" -exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] -edition = "2018" - [workspace] members = [ "actix-codec", @@ -24,13 +9,3 @@ members = [ "actix-utils", "router", ] - -[dev-dependencies] -actix-service = "0.2.0" -actix-codec = "0.1.0" -actix-rt = { version="0.1.0" } -actix-server = { version="0.2.0", features=["ssl"] } -env_logger = "0.5" -futures = "0.1.24" -openssl = { version="0.10" } -tokio-openssl = { version="0.3" } diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index eef4f712..acce78b8 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -2,6 +2,11 @@ ## [0.2.1] - 2019-02-xx +### Added + +* Add `InOrder` service. the service yields responses as they become available, + in the order that their originating requests were submitted to the service. + ### Changed * Convert `Timeout` and `InFlight` services to a transforms diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 126dfd97..b7c0f64f 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -18,7 +18,7 @@ name = "actix_utils" path = "src/lib.rs" [dependencies] -actix-service = "0.2.0" +actix-service = "0.2.1" actix-codec = "0.1.0" bytes = "0.4" futures = "0.1" @@ -28,6 +28,3 @@ log = "0.4" [dev-dependencies] actix-rt = "0.1" - -[patch.crates-io] -actix-service = { git = "https://github.com/actix/actix-net.git" } diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 6fe70848..15bff92b 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -6,6 +6,7 @@ pub mod either; pub mod framed; pub mod inflight; pub mod keepalive; +pub mod order; pub mod stream; pub mod time; pub mod timeout; diff --git a/actix-utils/src/order.rs b/actix-utils/src/order.rs new file mode 100644 index 00000000..1f33cad1 --- /dev/null +++ b/actix-utils/src/order.rs @@ -0,0 +1,245 @@ +use std::collections::VecDeque; +use std::fmt; +use std::marker::PhantomData; +use std::rc::Rc; + +use actix_service::{NewTransform, Service, Transform}; +use futures::future::{ok, FutureResult}; +use futures::task::AtomicTask; +use futures::unsync::oneshot; +use futures::{Async, Future, Poll}; + +struct Record { + rx: oneshot::Receiver>, + tx: oneshot::Sender>, +} + +/// Timeout error +pub enum InOrderError { + /// Service error + Service(E), + /// Service call dropped + Disconnected, +} + +impl From for InOrderError { + fn from(err: E) -> Self { + InOrderError::Service(err) + } +} + +impl fmt::Debug for InOrderError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + InOrderError::Service(e) => write!(f, "InOrderError::Service({:?})", e), + InOrderError::Disconnected => write!(f, "InOrderError::Disconnected"), + } + } +} + +/// InOrder - The service will yield responses as they become available, +/// in the order that their originating requests were submitted to the service. +pub struct InOrder { + _t: PhantomData, +} + +impl InOrder { + pub fn new() -> Self { + Self { _t: PhantomData } + } + + pub fn service() -> Self { + Self { _t: PhantomData } + } +} + +impl Default for InOrder { + fn default() -> Self { + Self::new() + } +} + +impl NewTransform for InOrder +where + S: Service, + S::Response: 'static, + S::Future: 'static, + S::Error: 'static, +{ + type Request = S::Request; + type Response = S::Response; + type Error = InOrderError; + type InitError = (); + type Transform = InOrderService; + type Future = FutureResult; + + fn new_transform(&self) -> Self::Future { + ok(InOrderService::new()) + } +} + +pub struct InOrderService { + task: Rc, + acks: VecDeque>, +} + +impl InOrderService +where + S: Service, + S::Response: 'static, + S::Future: 'static, + S::Error: 'static, +{ + pub fn new() -> Self { + Self { + acks: VecDeque::new(), + task: Rc::new(AtomicTask::new()), + } + } +} + +impl Transform for InOrderService +where + S: Service, + S::Response: 'static, + S::Future: 'static, + S::Error: 'static, +{ + type Request = S::Request; + type Response = S::Response; + type Error = InOrderError; + type Future = InOrderServiceResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.task.register(); + + // check acks + while !self.acks.is_empty() { + let rec = self.acks.front_mut().unwrap(); + match rec.rx.poll() { + Ok(Async::Ready(res)) => { + let rec = self.acks.pop_front().unwrap(); + let _ = rec.tx.send(res); + } + Ok(Async::NotReady) => break, + Err(oneshot::Canceled) => return Err(InOrderError::Disconnected), + } + } + + Ok(Async::Ready(())) + } + + fn call(&mut self, request: S::Request, service: &mut S) -> Self::Future { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + self.acks.push_back(Record { rx: rx1, tx: tx2 }); + + let task = self.task.clone(); + tokio_current_thread::spawn(service.call(request).then(move |res| { + task.notify(); + let _ = tx1.send(res); + Ok(()) + })); + + InOrderServiceResponse { rx: rx2 } + } +} + +#[doc(hidden)] +pub struct InOrderServiceResponse { + rx: oneshot::Receiver>, +} + +impl Future for InOrderServiceResponse { + type Item = S::Response; + type Error = InOrderError; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(Ok(res))) => Ok(Async::Ready(res)), + Ok(Async::Ready(Err(e))) => Err(e.into()), + Err(oneshot::Canceled) => Err(InOrderError::Disconnected), + } + } +} + +#[cfg(test)] +mod tests { + use futures::future::{lazy, Future}; + use futures::{stream::futures_unordered, sync::oneshot, Async, Poll, Stream}; + + use std::time::Duration; + + use super::*; + use actix_service::{Blank, Service, ServiceExt}; + + struct Srv; + + impl Service for Srv { + type Request = oneshot::Receiver; + type Response = usize; + type Error = (); + type Future = Box>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: oneshot::Receiver) -> Self::Future { + Box::new(req.map_err(|_| ())) + } + } + + struct SrvPoll { + s: S, + } + + impl Future for SrvPoll { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let _ = self.s.poll_ready(); + Ok(Async::NotReady) + } + } + + #[test] + fn test_inorder() { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + let (tx_stop, rx_stop) = oneshot::channel(); + + std::thread::spawn(move || { + let rx1 = rx1; + let rx2 = rx2; + let rx3 = rx3; + let tx_stop = tx_stop; + let _ = actix_rt::System::new("test").block_on(lazy(move || { + let mut srv = Blank::new().apply(InOrderService::new(), Srv); + + let res1 = srv.call(rx1); + let res2 = srv.call(rx2); + let res3 = srv.call(rx3); + tokio_current_thread::spawn(SrvPoll { s: srv }); + + futures_unordered(vec![res1, res2, res3]) + .collect() + .and_then(move |res: Vec<_>| { + assert_eq!(res, vec![1, 2, 3]); + let _ = tx_stop.send(()); + Ok(()) + }) + })); + }); + + let _ = tx3.send(3); + std::thread::sleep(Duration::from_millis(50)); + let _ = tx2.send(2); + let _ = tx1.send(1); + + let _ = rx_stop.wait(); + } +} diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index 7ed11388..1ada86f4 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -212,7 +212,7 @@ mod tests { let wait_time = Duration::from_millis(150); let res = actix_rt::System::new("test").block_on(lazy(|| { - let timeout = BlankNewService::default() + let timeout = BlankNewService::<_, _, ()>::default() .apply(Timeout::new(resolution), || Ok(SleepService(wait_time))); if let Async::Ready(mut to) = timeout.new_service().poll().unwrap() { to.call(())