1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 20:01:48 +01:00

Revert InOrder service changes

This commit is contained in:
Nikolay Kim 2019-12-11 23:10:02 +06:00
parent 52ecb4bcc5
commit 4305cdba2c
3 changed files with 14 additions and 1 deletions

View File

@ -1,5 +1,9 @@
# Changes
## [1.0.3] - 2019-12-11
* Revert InOrder service changes
## [1.0.2] - 2019-12-11
* Allow to create `framed::Dispatcher` with custom `mpsc::Receiver`

View File

@ -1,6 +1,6 @@
[package]
name = "actix-utils"
version = "1.0.2"
version = "1.0.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"]

View File

@ -1,6 +1,7 @@
use std::collections::VecDeque;
use std::convert::Infallible;
use std::fmt;
use std::rc::Rc;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
@ -10,6 +11,7 @@ use actix_service::{IntoService, Service, Transform};
use futures::future::{ok, Ready};
use crate::oneshot;
use crate::task::LocalWaker;
struct Record<I, E> {
rx: oneshot::Receiver<Result<I, E>>,
@ -103,6 +105,7 @@ where
pub struct InOrderService<S: Service> {
service: S,
waker: Rc<LocalWaker>,
acks: VecDeque<Record<S::Response, S::Error>>,
}
@ -120,6 +123,7 @@ where
Self {
service: service.into_service(),
acks: VecDeque::new(),
waker: Rc::new(LocalWaker::new()),
}
}
}
@ -137,6 +141,9 @@ where
type Future = InOrderServiceResponse<S>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// poll_ready could be called from different task
self.waker.register(cx.waker());
// check acks
while !self.acks.is_empty() {
let rec = self.acks.front_mut().unwrap();
@ -165,9 +172,11 @@ where
let (tx2, rx2) = oneshot::channel();
self.acks.push_back(Record { rx: rx1, tx: tx2 });
let waker = self.waker.clone();
let fut = self.service.call(request);
actix_rt::spawn(async move {
let res = fut.await;
waker.wake();
let _ = tx1.send(res);
});