mirror of
https://github.com/fafhrd91/actix-net
synced 2024-12-18 02:13:58 +01:00
add InOrder service
This commit is contained in:
parent
0063a26aab
commit
4be025926c
25
Cargo.toml
25
Cargo.toml
@ -1,18 +1,3 @@
|
|||||||
[package]
|
|
||||||
name = "actix-net"
|
|
||||||
version = "0.3.0"
|
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
|
||||||
description = "Actix net - framework for the compisible network services for Rust (experimental)"
|
|
||||||
readme = "README.md"
|
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
|
||||||
homepage = "https://actix.rs"
|
|
||||||
repository = "https://github.com/actix/actix-net.git"
|
|
||||||
documentation = "https://docs.rs/actix-net/"
|
|
||||||
categories = ["network-programming", "asynchronous"]
|
|
||||||
license = "MIT/Apache-2.0"
|
|
||||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
|
||||||
edition = "2018"
|
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [
|
members = [
|
||||||
"actix-codec",
|
"actix-codec",
|
||||||
@ -24,13 +9,3 @@ members = [
|
|||||||
"actix-utils",
|
"actix-utils",
|
||||||
"router",
|
"router",
|
||||||
]
|
]
|
||||||
|
|
||||||
[dev-dependencies]
|
|
||||||
actix-service = "0.2.0"
|
|
||||||
actix-codec = "0.1.0"
|
|
||||||
actix-rt = { version="0.1.0" }
|
|
||||||
actix-server = { version="0.2.0", features=["ssl"] }
|
|
||||||
env_logger = "0.5"
|
|
||||||
futures = "0.1.24"
|
|
||||||
openssl = { version="0.10" }
|
|
||||||
tokio-openssl = { version="0.3" }
|
|
||||||
|
@ -2,6 +2,11 @@
|
|||||||
|
|
||||||
## [0.2.1] - 2019-02-xx
|
## [0.2.1] - 2019-02-xx
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
* Add `InOrder` service. the service yields responses as they become available,
|
||||||
|
in the order that their originating requests were submitted to the service.
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
* Convert `Timeout` and `InFlight` services to a transforms
|
* Convert `Timeout` and `InFlight` services to a transforms
|
||||||
|
@ -18,7 +18,7 @@ name = "actix_utils"
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-service = "0.2.0"
|
actix-service = "0.2.1"
|
||||||
actix-codec = "0.1.0"
|
actix-codec = "0.1.0"
|
||||||
bytes = "0.4"
|
bytes = "0.4"
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
@ -28,6 +28,3 @@ log = "0.4"
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "0.1"
|
actix-rt = "0.1"
|
||||||
|
|
||||||
[patch.crates-io]
|
|
||||||
actix-service = { git = "https://github.com/actix/actix-net.git" }
|
|
||||||
|
@ -6,6 +6,7 @@ pub mod either;
|
|||||||
pub mod framed;
|
pub mod framed;
|
||||||
pub mod inflight;
|
pub mod inflight;
|
||||||
pub mod keepalive;
|
pub mod keepalive;
|
||||||
|
pub mod order;
|
||||||
pub mod stream;
|
pub mod stream;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
pub mod timeout;
|
pub mod timeout;
|
||||||
|
245
actix-utils/src/order.rs
Normal file
245
actix-utils/src/order.rs
Normal file
@ -0,0 +1,245 @@
|
|||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::fmt;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use actix_service::{NewTransform, Service, Transform};
|
||||||
|
use futures::future::{ok, FutureResult};
|
||||||
|
use futures::task::AtomicTask;
|
||||||
|
use futures::unsync::oneshot;
|
||||||
|
use futures::{Async, Future, Poll};
|
||||||
|
|
||||||
|
struct Record<I, E> {
|
||||||
|
rx: oneshot::Receiver<Result<I, E>>,
|
||||||
|
tx: oneshot::Sender<Result<I, E>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Timeout error
|
||||||
|
pub enum InOrderError<E> {
|
||||||
|
/// Service error
|
||||||
|
Service(E),
|
||||||
|
/// Service call dropped
|
||||||
|
Disconnected,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E> From<E> for InOrderError<E> {
|
||||||
|
fn from(err: E) -> Self {
|
||||||
|
InOrderError::Service(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: fmt::Debug> fmt::Debug for InOrderError<E> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
InOrderError::Service(e) => write!(f, "InOrderError::Service({:?})", e),
|
||||||
|
InOrderError::Disconnected => write!(f, "InOrderError::Disconnected"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// InOrder - The service will yield responses as they become available,
|
||||||
|
/// in the order that their originating requests were submitted to the service.
|
||||||
|
pub struct InOrder<S> {
|
||||||
|
_t: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> InOrder<S> {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { _t: PhantomData }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn service() -> Self {
|
||||||
|
Self { _t: PhantomData }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Default for InOrder<S> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> NewTransform<S> for InOrder<S>
|
||||||
|
where
|
||||||
|
S: Service,
|
||||||
|
S::Response: 'static,
|
||||||
|
S::Future: 'static,
|
||||||
|
S::Error: 'static,
|
||||||
|
{
|
||||||
|
type Request = S::Request;
|
||||||
|
type Response = S::Response;
|
||||||
|
type Error = InOrderError<S::Error>;
|
||||||
|
type InitError = ();
|
||||||
|
type Transform = InOrderService<S>;
|
||||||
|
type Future = FutureResult<Self::Transform, Self::InitError>;
|
||||||
|
|
||||||
|
fn new_transform(&self) -> Self::Future {
|
||||||
|
ok(InOrderService::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct InOrderService<S: Service> {
|
||||||
|
task: Rc<AtomicTask>,
|
||||||
|
acks: VecDeque<Record<S::Response, S::Error>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> InOrderService<S>
|
||||||
|
where
|
||||||
|
S: Service,
|
||||||
|
S::Response: 'static,
|
||||||
|
S::Future: 'static,
|
||||||
|
S::Error: 'static,
|
||||||
|
{
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
acks: VecDeque::new(),
|
||||||
|
task: Rc::new(AtomicTask::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Transform<S> for InOrderService<S>
|
||||||
|
where
|
||||||
|
S: Service,
|
||||||
|
S::Response: 'static,
|
||||||
|
S::Future: 'static,
|
||||||
|
S::Error: 'static,
|
||||||
|
{
|
||||||
|
type Request = S::Request;
|
||||||
|
type Response = S::Response;
|
||||||
|
type Error = InOrderError<S::Error>;
|
||||||
|
type Future = InOrderServiceResponse<S>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||||
|
self.task.register();
|
||||||
|
|
||||||
|
// check acks
|
||||||
|
while !self.acks.is_empty() {
|
||||||
|
let rec = self.acks.front_mut().unwrap();
|
||||||
|
match rec.rx.poll() {
|
||||||
|
Ok(Async::Ready(res)) => {
|
||||||
|
let rec = self.acks.pop_front().unwrap();
|
||||||
|
let _ = rec.tx.send(res);
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => break,
|
||||||
|
Err(oneshot::Canceled) => return Err(InOrderError::Disconnected),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::Ready(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, request: S::Request, service: &mut S) -> Self::Future {
|
||||||
|
let (tx1, rx1) = oneshot::channel();
|
||||||
|
let (tx2, rx2) = oneshot::channel();
|
||||||
|
self.acks.push_back(Record { rx: rx1, tx: tx2 });
|
||||||
|
|
||||||
|
let task = self.task.clone();
|
||||||
|
tokio_current_thread::spawn(service.call(request).then(move |res| {
|
||||||
|
task.notify();
|
||||||
|
let _ = tx1.send(res);
|
||||||
|
Ok(())
|
||||||
|
}));
|
||||||
|
|
||||||
|
InOrderServiceResponse { rx: rx2 }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub struct InOrderServiceResponse<S: Service> {
|
||||||
|
rx: oneshot::Receiver<Result<S::Response, S::Error>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Service> Future for InOrderServiceResponse<S> {
|
||||||
|
type Item = S::Response;
|
||||||
|
type Error = InOrderError<S::Error>;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
match self.rx.poll() {
|
||||||
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
|
Ok(Async::Ready(Ok(res))) => Ok(Async::Ready(res)),
|
||||||
|
Ok(Async::Ready(Err(e))) => Err(e.into()),
|
||||||
|
Err(oneshot::Canceled) => Err(InOrderError::Disconnected),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use futures::future::{lazy, Future};
|
||||||
|
use futures::{stream::futures_unordered, sync::oneshot, Async, Poll, Stream};
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use actix_service::{Blank, Service, ServiceExt};
|
||||||
|
|
||||||
|
struct Srv;
|
||||||
|
|
||||||
|
impl Service for Srv {
|
||||||
|
type Request = oneshot::Receiver<usize>;
|
||||||
|
type Response = usize;
|
||||||
|
type Error = ();
|
||||||
|
type Future = Box<Future<Item = usize, Error = ()>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||||
|
Ok(Async::Ready(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: oneshot::Receiver<usize>) -> Self::Future {
|
||||||
|
Box::new(req.map_err(|_| ()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SrvPoll<S: Service> {
|
||||||
|
s: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Service> Future for SrvPoll<S> {
|
||||||
|
type Item = ();
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<(), ()> {
|
||||||
|
let _ = self.s.poll_ready();
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_inorder() {
|
||||||
|
let (tx1, rx1) = oneshot::channel();
|
||||||
|
let (tx2, rx2) = oneshot::channel();
|
||||||
|
let (tx3, rx3) = oneshot::channel();
|
||||||
|
let (tx_stop, rx_stop) = oneshot::channel();
|
||||||
|
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let rx1 = rx1;
|
||||||
|
let rx2 = rx2;
|
||||||
|
let rx3 = rx3;
|
||||||
|
let tx_stop = tx_stop;
|
||||||
|
let _ = actix_rt::System::new("test").block_on(lazy(move || {
|
||||||
|
let mut srv = Blank::new().apply(InOrderService::new(), Srv);
|
||||||
|
|
||||||
|
let res1 = srv.call(rx1);
|
||||||
|
let res2 = srv.call(rx2);
|
||||||
|
let res3 = srv.call(rx3);
|
||||||
|
tokio_current_thread::spawn(SrvPoll { s: srv });
|
||||||
|
|
||||||
|
futures_unordered(vec![res1, res2, res3])
|
||||||
|
.collect()
|
||||||
|
.and_then(move |res: Vec<_>| {
|
||||||
|
assert_eq!(res, vec![1, 2, 3]);
|
||||||
|
let _ = tx_stop.send(());
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ = tx3.send(3);
|
||||||
|
std::thread::sleep(Duration::from_millis(50));
|
||||||
|
let _ = tx2.send(2);
|
||||||
|
let _ = tx1.send(1);
|
||||||
|
|
||||||
|
let _ = rx_stop.wait();
|
||||||
|
}
|
||||||
|
}
|
@ -212,7 +212,7 @@ mod tests {
|
|||||||
let wait_time = Duration::from_millis(150);
|
let wait_time = Duration::from_millis(150);
|
||||||
|
|
||||||
let res = actix_rt::System::new("test").block_on(lazy(|| {
|
let res = actix_rt::System::new("test").block_on(lazy(|| {
|
||||||
let timeout = BlankNewService::default()
|
let timeout = BlankNewService::<_, _, ()>::default()
|
||||||
.apply(Timeout::new(resolution), || Ok(SleepService(wait_time)));
|
.apply(Timeout::new(resolution), || Ok(SleepService(wait_time)));
|
||||||
if let Async::Ready(mut to) = timeout.new_service().poll().unwrap() {
|
if let Async::Ready(mut to) = timeout.new_service().poll().unwrap() {
|
||||||
to.call(())
|
to.call(())
|
||||||
|
Loading…
Reference in New Issue
Block a user