2019-02-03 14:05:13 -08:00
|
|
|
use std::collections::VecDeque;
|
2019-05-12 06:03:50 -07:00
|
|
|
use std::convert::Infallible;
|
2019-02-03 14:05:13 -08:00
|
|
|
use std::fmt;
|
2019-11-14 18:38:24 +06:00
|
|
|
use std::future::Future;
|
2019-02-03 14:05:13 -08:00
|
|
|
use std::marker::PhantomData;
|
2019-11-14 18:38:24 +06:00
|
|
|
use std::pin::Pin;
|
2019-12-12 06:56:45 +06:00
|
|
|
use std::rc::Rc;
|
2019-11-14 18:38:24 +06:00
|
|
|
use std::task::{Context, Poll};
|
2019-02-03 14:05:13 -08:00
|
|
|
|
2019-05-12 06:03:50 -07:00
|
|
|
use actix_service::{IntoService, Service, Transform};
|
2019-11-25 17:54:47 +06:00
|
|
|
use futures::future::{ok, Ready};
|
2019-11-14 18:38:24 +06:00
|
|
|
|
|
|
|
use crate::oneshot;
|
2019-12-11 23:10:02 +06:00
|
|
|
use crate::task::LocalWaker;
|
2019-02-03 14:05:13 -08:00
|
|
|
|
|
|
|
struct Record<I, E> {
|
|
|
|
rx: oneshot::Receiver<Result<I, E>>,
|
|
|
|
tx: oneshot::Sender<Result<I, E>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Timeout error
|
|
|
|
pub enum InOrderError<E> {
|
|
|
|
/// Service error
|
|
|
|
Service(E),
|
|
|
|
/// Service call dropped
|
|
|
|
Disconnected,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<E> From<E> for InOrderError<E> {
|
|
|
|
fn from(err: E) -> Self {
|
|
|
|
InOrderError::Service(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<E: fmt::Debug> fmt::Debug for InOrderError<E> {
|
2019-12-02 22:30:09 +06:00
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
2019-02-03 14:05:13 -08:00
|
|
|
match self {
|
|
|
|
InOrderError::Service(e) => write!(f, "InOrderError::Service({:?})", e),
|
|
|
|
InOrderError::Disconnected => write!(f, "InOrderError::Disconnected"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-11 08:39:28 -08:00
|
|
|
impl<E: fmt::Display> fmt::Display for InOrderError<E> {
|
2019-12-02 22:30:09 +06:00
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
2019-02-11 08:39:28 -08:00
|
|
|
match self {
|
|
|
|
InOrderError::Service(e) => e.fmt(f),
|
|
|
|
InOrderError::Disconnected => write!(f, "InOrder service disconnected"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-03 14:05:13 -08:00
|
|
|
/// InOrder - The service will yield responses as they become available,
|
|
|
|
/// in the order that their originating requests were submitted to the service.
|
|
|
|
pub struct InOrder<S> {
|
|
|
|
_t: PhantomData<S>,
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl<S> InOrder<S>
|
|
|
|
where
|
|
|
|
S: Service,
|
|
|
|
S::Response: 'static,
|
|
|
|
S::Future: 'static,
|
|
|
|
S::Error: 'static,
|
|
|
|
{
|
|
|
|
pub fn new() -> Self {
|
2019-02-03 14:05:13 -08:00
|
|
|
Self { _t: PhantomData }
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
pub fn service(service: S) -> InOrderService<S> {
|
2019-03-04 19:38:11 -08:00
|
|
|
InOrderService::new(service)
|
2019-02-03 14:05:13 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl<S> Default for InOrder<S>
|
|
|
|
where
|
|
|
|
S: Service,
|
|
|
|
S::Response: 'static,
|
|
|
|
S::Future: 'static,
|
|
|
|
S::Error: 'static,
|
|
|
|
{
|
|
|
|
fn default() -> Self {
|
|
|
|
Self::new()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S> Transform<S> for InOrder<S>
|
2019-02-03 14:05:13 -08:00
|
|
|
where
|
2019-03-09 07:27:35 -08:00
|
|
|
S: Service,
|
2019-02-03 14:05:13 -08:00
|
|
|
S::Response: 'static,
|
|
|
|
S::Future: 'static,
|
|
|
|
S::Error: 'static,
|
|
|
|
{
|
2019-03-09 07:27:35 -08:00
|
|
|
type Request = S::Request;
|
2019-02-03 14:05:13 -08:00
|
|
|
type Response = S::Response;
|
|
|
|
type Error = InOrderError<S::Error>;
|
2019-05-12 06:03:50 -07:00
|
|
|
type InitError = Infallible;
|
2019-03-09 07:27:35 -08:00
|
|
|
type Transform = InOrderService<S>;
|
2019-11-14 18:38:24 +06:00
|
|
|
type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
2019-02-03 14:05:13 -08:00
|
|
|
|
2019-03-04 19:38:11 -08:00
|
|
|
fn new_transform(&self, service: S) -> Self::Future {
|
|
|
|
ok(InOrderService::new(service))
|
2019-02-03 14:05:13 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
pub struct InOrderService<S: Service> {
|
2019-03-04 19:38:11 -08:00
|
|
|
service: S,
|
2019-12-11 23:10:02 +06:00
|
|
|
waker: Rc<LocalWaker>,
|
2019-02-03 14:05:13 -08:00
|
|
|
acks: VecDeque<Record<S::Response, S::Error>>,
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl<S> InOrderService<S>
|
2019-02-03 14:05:13 -08:00
|
|
|
where
|
2019-03-09 07:27:35 -08:00
|
|
|
S: Service,
|
2019-02-03 14:05:13 -08:00
|
|
|
S::Response: 'static,
|
|
|
|
S::Future: 'static,
|
|
|
|
S::Error: 'static,
|
|
|
|
{
|
2019-03-12 13:39:04 -07:00
|
|
|
pub fn new<U>(service: U) -> Self
|
|
|
|
where
|
|
|
|
U: IntoService<S>,
|
|
|
|
{
|
2019-02-03 14:05:13 -08:00
|
|
|
Self {
|
2019-03-12 13:39:04 -07:00
|
|
|
service: service.into_service(),
|
2019-02-03 14:05:13 -08:00
|
|
|
acks: VecDeque::new(),
|
2019-12-11 23:10:02 +06:00
|
|
|
waker: Rc::new(LocalWaker::new()),
|
2019-02-03 14:05:13 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl<S> Service for InOrderService<S>
|
2019-02-03 14:05:13 -08:00
|
|
|
where
|
2019-03-09 07:27:35 -08:00
|
|
|
S: Service,
|
2019-02-03 14:05:13 -08:00
|
|
|
S::Response: 'static,
|
|
|
|
S::Future: 'static,
|
|
|
|
S::Error: 'static,
|
|
|
|
{
|
2019-03-09 07:27:35 -08:00
|
|
|
type Request = S::Request;
|
2019-02-03 14:05:13 -08:00
|
|
|
type Response = S::Response;
|
|
|
|
type Error = InOrderError<S::Error>;
|
2019-03-09 07:27:35 -08:00
|
|
|
type Future = InOrderServiceResponse<S>;
|
2019-02-03 14:05:13 -08:00
|
|
|
|
2019-12-02 22:30:09 +06:00
|
|
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
2019-12-11 23:10:02 +06:00
|
|
|
// poll_ready could be called from different task
|
|
|
|
self.waker.register(cx.waker());
|
|
|
|
|
2019-02-03 14:05:13 -08:00
|
|
|
// check acks
|
|
|
|
while !self.acks.is_empty() {
|
|
|
|
let rec = self.acks.front_mut().unwrap();
|
2019-11-14 18:38:24 +06:00
|
|
|
match Pin::new(&mut rec.rx).poll(cx) {
|
|
|
|
Poll::Ready(Ok(res)) => {
|
2019-02-03 14:05:13 -08:00
|
|
|
let rec = self.acks.pop_front().unwrap();
|
|
|
|
let _ = rec.tx.send(res);
|
|
|
|
}
|
2019-11-14 18:38:24 +06:00
|
|
|
Poll::Pending => break,
|
|
|
|
Poll::Ready(Err(oneshot::Canceled)) => {
|
|
|
|
return Poll::Ready(Err(InOrderError::Disconnected))
|
|
|
|
}
|
2019-02-03 14:05:13 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-12 16:03:05 -07:00
|
|
|
// check nested service
|
2019-11-14 18:38:24 +06:00
|
|
|
if let Poll::Pending = self.service.poll_ready(cx).map_err(InOrderError::Service)? {
|
|
|
|
Poll::Pending
|
2019-03-12 16:03:05 -07:00
|
|
|
} else {
|
2019-11-14 18:38:24 +06:00
|
|
|
Poll::Ready(Ok(()))
|
2019-03-12 16:03:05 -07:00
|
|
|
}
|
2019-02-03 14:05:13 -08:00
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
fn call(&mut self, request: S::Request) -> Self::Future {
|
2019-02-03 14:05:13 -08:00
|
|
|
let (tx1, rx1) = oneshot::channel();
|
|
|
|
let (tx2, rx2) = oneshot::channel();
|
|
|
|
self.acks.push_back(Record { rx: rx1, tx: tx2 });
|
|
|
|
|
2019-12-11 23:10:02 +06:00
|
|
|
let waker = self.waker.clone();
|
2019-11-25 17:54:47 +06:00
|
|
|
let fut = self.service.call(request);
|
2019-11-26 08:26:22 +06:00
|
|
|
actix_rt::spawn(async move {
|
2019-11-25 17:54:47 +06:00
|
|
|
let res = fut.await;
|
2019-12-11 23:10:02 +06:00
|
|
|
waker.wake();
|
2019-02-03 14:05:13 -08:00
|
|
|
let _ = tx1.send(res);
|
2019-11-25 17:54:47 +06:00
|
|
|
});
|
2019-02-03 14:05:13 -08:00
|
|
|
|
|
|
|
InOrderServiceResponse { rx: rx2 }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[doc(hidden)]
|
2019-03-09 07:27:35 -08:00
|
|
|
pub struct InOrderServiceResponse<S: Service> {
|
2019-02-03 14:05:13 -08:00
|
|
|
rx: oneshot::Receiver<Result<S::Response, S::Error>>,
|
|
|
|
}
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl<S: Service> Future for InOrderServiceResponse<S> {
|
2019-11-14 18:38:24 +06:00
|
|
|
type Output = Result<S::Response, InOrderError<S::Error>>;
|
|
|
|
|
2019-12-02 22:30:09 +06:00
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
2019-11-14 18:38:24 +06:00
|
|
|
match Pin::new(&mut self.rx).poll(cx) {
|
|
|
|
Poll::Pending => Poll::Pending,
|
|
|
|
Poll::Ready(Ok(Ok(res))) => Poll::Ready(Ok(res)),
|
|
|
|
Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e.into())),
|
|
|
|
Poll::Ready(Err(_)) => Poll::Ready(Err(InOrderError::Disconnected)),
|
2019-02-03 14:05:13 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
|
2019-11-14 18:38:24 +06:00
|
|
|
use std::task::{Context, Poll};
|
2019-02-03 14:05:13 -08:00
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use super::*;
|
2019-11-14 18:38:24 +06:00
|
|
|
use actix_service::Service;
|
|
|
|
use futures::channel::oneshot;
|
2019-12-12 06:56:45 +06:00
|
|
|
use futures::future::{lazy, poll_fn, FutureExt, LocalBoxFuture};
|
2019-02-03 14:05:13 -08:00
|
|
|
|
|
|
|
struct Srv;
|
|
|
|
|
2019-03-09 07:27:35 -08:00
|
|
|
impl Service for Srv {
|
|
|
|
type Request = oneshot::Receiver<usize>;
|
2019-02-03 14:05:13 -08:00
|
|
|
type Response = usize;
|
|
|
|
type Error = ();
|
2019-11-14 18:38:24 +06:00
|
|
|
type Future = LocalBoxFuture<'static, Result<usize, ()>>;
|
2019-02-03 14:05:13 -08:00
|
|
|
|
2019-12-02 22:30:09 +06:00
|
|
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
2019-11-14 18:38:24 +06:00
|
|
|
Poll::Ready(Ok(()))
|
2019-02-03 14:05:13 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, req: oneshot::Receiver<usize>) -> Self::Future {
|
2019-11-14 18:38:24 +06:00
|
|
|
req.map(|res| res.map_err(|_| ())).boxed_local()
|
2019-02-03 14:05:13 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-25 21:49:11 +06:00
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_inorder() {
|
2019-02-03 14:05:13 -08:00
|
|
|
let (tx1, rx1) = oneshot::channel();
|
|
|
|
let (tx2, rx2) = oneshot::channel();
|
|
|
|
let (tx3, rx3) = oneshot::channel();
|
|
|
|
let (tx_stop, rx_stop) = oneshot::channel();
|
|
|
|
|
2019-03-14 11:55:39 -07:00
|
|
|
let h = std::thread::spawn(move || {
|
2019-02-03 14:05:13 -08:00
|
|
|
let rx1 = rx1;
|
|
|
|
let rx2 = rx2;
|
|
|
|
let rx3 = rx3;
|
|
|
|
let tx_stop = tx_stop;
|
2019-11-14 18:38:24 +06:00
|
|
|
let _ = actix_rt::System::new("test").block_on(async {
|
|
|
|
let mut srv = InOrderService::new(Srv);
|
2019-02-03 14:05:13 -08:00
|
|
|
|
2019-12-12 06:56:45 +06:00
|
|
|
let _ = lazy(|cx| srv.poll_ready(cx)).await;
|
2019-02-03 14:05:13 -08:00
|
|
|
let res1 = srv.call(rx1);
|
|
|
|
let res2 = srv.call(rx2);
|
|
|
|
let res3 = srv.call(rx3);
|
2019-11-14 18:38:24 +06:00
|
|
|
|
2019-12-12 06:56:45 +06:00
|
|
|
actix_rt::spawn(async move {
|
|
|
|
let _ = poll_fn(|cx| {
|
|
|
|
let _ = srv.poll_ready(cx);
|
|
|
|
Poll::<()>::Pending
|
|
|
|
})
|
|
|
|
.await;
|
|
|
|
});
|
2019-11-14 18:38:24 +06:00
|
|
|
|
|
|
|
assert_eq!(res1.await.unwrap(), 1);
|
|
|
|
assert_eq!(res2.await.unwrap(), 2);
|
|
|
|
assert_eq!(res3.await.unwrap(), 3);
|
|
|
|
|
|
|
|
let _ = tx_stop.send(());
|
|
|
|
actix_rt::System::current().stop();
|
|
|
|
});
|
2019-02-03 14:05:13 -08:00
|
|
|
});
|
|
|
|
|
|
|
|
let _ = tx3.send(3);
|
|
|
|
std::thread::sleep(Duration::from_millis(50));
|
|
|
|
let _ = tx2.send(2);
|
|
|
|
let _ = tx1.send(1);
|
|
|
|
|
2019-11-25 21:49:11 +06:00
|
|
|
let _ = rx_stop.await;
|
2019-03-14 11:55:39 -07:00
|
|
|
let _ = h.join();
|
2019-02-03 14:05:13 -08:00
|
|
|
}
|
|
|
|
}
|