1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-28 15:40:36 +02:00

Migrate actix-net to std::future (#64)

* Migrate actix-codec, actix-rt, and actix-threadpool to std::future

* update to latest tokio alpha and futures-rs

* Migrate actix-service to std::future,

This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits,
look into the semtexzv/std-future-service-tmp branch.

* update futures-rs and tokio

* Migrate actix-threadpool to std::future (#59)

* Migrate actix-threadpool to std::future

* Cosmetic refactor

- turn log::error! into log::warn! as it doesn't throw any error
- add Clone and Copy impls for Cancelled making it cheap to operate with
- apply rustfmt

* Bump up crate version to 0.2.0 and pre-fill its changelog

* Disable patching 'actix-threadpool' crate in global workspace as unnecessary

* Revert patching and fix 'actix-rt'

* Migrate actix-rt to std::future (#47)

* remove Pin from Service::poll_ready(); simplify combinators api; make code compile

* disable tests

* update travis config

* refactor naming

* drop IntoFuture trait

* Migrate actix-server to std::future (#50)

Still not finished, this is more WIP, this is an aggregation of several commits, which
can be found in semtexzv/std-future-server-tmp branch

* update actix-server

* rename Factor to ServiceFactory

* start server worker in start mehtod

* update actix-utils

* remove IntoTransform trait

* Migrate actix-server::ssl::nativetls to std futures (#61)

* Refactor 'nativetls' module

* Migrate 'actix-server-config' to std futures

- remove "uds" feature
- disable features by default

* Switch NativeTlsAcceptor to use 'tokio-tls' crate

* Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate

* update openssl impl

* migrate actix-connect to std::future

* migrate actix-ioframe to std::future

* update version to alpha.1

* fix boxed service

* migrate server rustls support

* migratte openssl and rustls connecttors

* store the thread's handle with arbiter (#62)

* update ssl connect tests

* restore service tests

* update readme
This commit is contained in:
Nikolay Kim
2019-11-14 18:38:24 +06:00
committed by GitHub
parent 9fa2a36b4e
commit 13049b80ca
92 changed files with 4273 additions and 4165 deletions

View File

@ -1,7 +1,11 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use futures::unsync::mpsc;
use actix_utils::mpsc;
use futures::Stream;
use pin_project::pin_project;
use crate::dispatcher::FramedMessage;
use crate::sink::Sink;
@ -13,7 +17,7 @@ pub struct Connect<Io, St = (), Codec = ()> {
impl<Io> Connect<Io>
where
Io: AsyncRead + AsyncWrite,
Io: AsyncRead + AsyncWrite + Unpin,
{
pub(crate) fn new(io: Io) -> Self {
Self {
@ -24,9 +28,9 @@ where
pub fn codec<Codec>(self, codec: Codec) -> ConnectResult<Io, (), Codec>
where
Codec: Encoder + Decoder,
Codec: Encoder + Decoder + Unpin,
{
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = mpsc::channel();
let sink = Sink::new(tx);
ConnectResult {
@ -38,10 +42,12 @@ where
}
}
#[pin_project]
pub struct ConnectResult<Io, St, Codec: Encoder + Decoder> {
pub(crate) state: St,
#[pin]
pub(crate) framed: Framed<Io, Codec>,
pub(crate) rx: mpsc::UnboundedReceiver<FramedMessage<<Codec as Encoder>::Item>>,
pub(crate) rx: mpsc::Receiver<FramedMessage<<Codec as Encoder>::Item>>,
pub(crate) sink: Sink<<Codec as Encoder>::Item>,
}
@ -72,39 +78,41 @@ impl<Io, St, Codec: Encoder + Decoder> ConnectResult<Io, St, Codec> {
}
}
impl<Io, St, Codec> futures::Stream for ConnectResult<Io, St, Codec>
impl<Io, St, Codec> Stream for ConnectResult<Io, St, Codec>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
{
type Item = <Codec as Decoder>::Item;
type Error = <Codec as Decoder>::Error;
type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
self.framed.poll()
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.project().framed.poll_next(cx)
}
}
impl<Io, St, Codec> futures::Sink for ConnectResult<Io, St, Codec>
impl<Io, St, Codec> futures::Sink<<Codec as Encoder>::Item> for ConnectResult<Io, St, Codec>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
{
type SinkItem = <Codec as Encoder>::Item;
type SinkError = <Codec as Encoder>::Error;
type Error = <Codec as Encoder>::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().framed.poll_ready(cx)
}
fn start_send(
&mut self,
item: Self::SinkItem,
) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
self.framed.start_send(item)
self: Pin<&mut Self>,
item: <Codec as Encoder>::Item,
) -> Result<(), Self::Error> {
self.project().framed.start_send(item)
}
fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
self.framed.poll_complete()
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().framed.poll_flush(cx)
}
fn close(&mut self) -> futures::Poll<(), Self::SinkError> {
self.framed.close()
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().framed.poll_close(cx)
}
}

View File

@ -1,13 +1,17 @@
//! Framed dispatcher service and related utilities
use std::collections::VecDeque;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures::task::AtomicTask;
use futures::unsync::{mpsc, oneshot};
use futures::{Async, Future, Poll, Sink as FutureSink, Stream};
use actix_utils::task::LocalWaker;
use actix_utils::{mpsc, oneshot};
use futures::future::ready;
use futures::{FutureExt, Sink as FutureSink, Stream};
use log::debug;
use crate::cell::Cell;
@ -27,13 +31,14 @@ pub(crate) enum FramedMessage<T> {
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
#[pin_project::pin_project]
pub(crate) struct FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
T: AsyncRead + AsyncWrite + Unpin,
U: Encoder + Decoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
@ -42,7 +47,7 @@ where
state: State<St>,
dispatch_state: FramedState<S, U>,
framed: Framed<T, U>,
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
rx: Option<mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>>,
inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
}
@ -52,8 +57,8 @@ where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
@ -61,7 +66,7 @@ where
framed: Framed<T, U>,
state: State<St>,
service: F,
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
rx: mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>,
sink: Sink<<U as Encoder>::Item>,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
) -> Self {
@ -75,13 +80,13 @@ where
dispatch_state: FramedState::Processing,
inner: Cell::new(FramedDispatcherInner {
buf: VecDeque::new(),
task: AtomicTask::new(),
task: LocalWaker::new(),
}),
}
}
}
enum FramedState<S: Service, U: Encoder + Decoder> {
enum FramedState<S: Service, U: Encoder + Decoder + Unpin> {
Processing,
Error(ServiceError<S::Error, U>),
FramedError(ServiceError<S::Error, U>),
@ -89,7 +94,7 @@ enum FramedState<S: Service, U: Encoder + Decoder> {
Stopping,
}
impl<S: Service, U: Encoder + Decoder> FramedState<S, U> {
impl<S: Service, U: Encoder + Decoder + Unpin> FramedState<S, U> {
fn stop(&mut self, tx: Option<oneshot::Sender<()>>) {
match self {
FramedState::FlushAndStop(ref mut vec) => {
@ -115,149 +120,7 @@ impl<S: Service, U: Encoder + Decoder> FramedState<S, U> {
struct FramedDispatcherInner<I, E> {
buf: VecDeque<Result<I, E>>,
task: AtomicTask,
}
impl<St, S, T, U> FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
fn disconnect(&mut self, error: bool) {
if let Some(ref disconnect) = self.disconnect {
(&*disconnect)(&mut *self.state.get_mut(), error);
}
}
fn poll_read(&mut self) -> bool {
loop {
match self.service.poll_ready() {
Ok(Async::Ready(_)) => {
let item = match self.framed.poll() {
Ok(Async::Ready(Some(el))) => el,
Err(err) => {
self.dispatch_state =
FramedState::FramedError(ServiceError::Decoder(err));
return true;
}
Ok(Async::NotReady) => return false,
Ok(Async::Ready(None)) => {
log::trace!("Client disconnected");
self.dispatch_state = FramedState::Stopping;
return true;
}
};
let mut cell = self.inner.clone();
tokio_current_thread::spawn(
self.service
.call(Item::new(self.state.clone(), self.sink.clone(), item))
.then(move |item| {
let item = match item {
Ok(Some(item)) => Ok(item),
Ok(None) => return Ok(()),
Err(err) => Err(err),
};
unsafe {
let inner = cell.get_mut();
inner.buf.push_back(item);
inner.task.notify();
}
Ok(())
}),
);
}
Ok(Async::NotReady) => return false,
Err(err) => {
self.dispatch_state = FramedState::Error(ServiceError::Service(err));
return true;
}
}
}
}
/// write to framed object
fn poll_write(&mut self) -> bool {
let inner = unsafe { self.inner.get_mut() };
let mut rx_done = self.rx.is_none();
let mut buf_empty = inner.buf.is_empty();
loop {
while !self.framed.is_write_buf_full() {
if !buf_empty {
match inner.buf.pop_front().unwrap() {
Ok(msg) => {
if let Err(err) = self.framed.force_send(msg) {
self.dispatch_state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
buf_empty = inner.buf.is_empty();
}
Err(err) => {
self.dispatch_state =
FramedState::Error(ServiceError::Service(err));
return true;
}
}
}
if !rx_done && self.rx.is_some() {
match self.rx.as_mut().unwrap().poll() {
Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => {
if let Err(err) = self.framed.force_send(msg) {
self.dispatch_state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
}
Ok(Async::Ready(Some(FramedMessage::Close))) => {
self.dispatch_state.stop(None);
return true;
}
Ok(Async::Ready(Some(FramedMessage::WaitClose(tx)))) => {
self.dispatch_state.stop(Some(tx));
return true;
}
Ok(Async::Ready(None)) => {
rx_done = true;
let _ = self.rx.take();
}
Ok(Async::NotReady) => rx_done = true,
Err(_e) => {
rx_done = true;
let _ = self.rx.take();
}
}
}
if rx_done && buf_empty {
break;
}
}
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Ok(Async::NotReady) => break,
Err(err) => {
debug!("Error sending data: {:?}", err);
self.dispatch_state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
Ok(Async::Ready(_)) => (),
}
} else {
break;
}
}
false
}
task: LocalWaker,
}
impl<St, S, T, U> Future for FramedDispatcher<St, S, T, U>
@ -265,63 +128,266 @@ where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
type Item = ();
type Error = ServiceError<S::Error, U>;
type Output = Result<(), ServiceError<S::Error, U>>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
unsafe { self.inner.get_ref().task.register() };
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
unsafe { self.inner.get_ref().task.register(cx.waker()) };
match mem::replace(&mut self.dispatch_state, FramedState::Processing) {
FramedState::Processing => {
if self.poll_read() || self.poll_write() {
self.poll()
} else {
Ok(Async::NotReady)
}
let this = self.project();
poll(
cx,
this.service,
this.state,
this.sink,
this.framed,
this.dispatch_state,
this.rx,
this.inner,
this.disconnect,
)
}
}
fn poll<St, S, T, U>(
cx: &mut Context,
srv: &mut S,
state: &mut State<St>,
sink: &mut Sink<<U as Encoder>::Item>,
framed: &mut Framed<T, U>,
dispatch_state: &mut FramedState<S, U>,
rx: &mut Option<mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>>,
inner: &mut Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
disconnect: &mut Option<Rc<dyn Fn(&mut St, bool)>>,
) -> Poll<Result<(), ServiceError<S::Error, U>>>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
match mem::replace(dispatch_state, FramedState::Processing) {
FramedState::Processing => {
if poll_read(cx, srv, state, sink, framed, dispatch_state, inner)
|| poll_write(cx, framed, dispatch_state, rx, inner)
{
poll(
cx,
srv,
state,
sink,
framed,
dispatch_state,
rx,
inner,
disconnect,
)
} else {
Poll::Pending
}
FramedState::Error(err) => {
if self.framed.is_write_buf_empty()
|| (self.poll_write() || self.framed.is_write_buf_empty())
{
self.disconnect(true);
Err(err)
} else {
self.dispatch_state = FramedState::Error(err);
Ok(Async::NotReady)
}
FramedState::Error(err) => {
if framed.is_write_buf_empty()
|| (poll_write(cx, framed, dispatch_state, rx, inner)
|| framed.is_write_buf_empty())
{
if let Some(ref disconnect) = disconnect {
(&*disconnect)(&mut *state.get_mut(), true);
}
Poll::Ready(Err(err))
} else {
*dispatch_state = FramedState::Error(err);
Poll::Pending
}
FramedState::FlushAndStop(mut vec) => {
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Err(err) => {
debug!("Error sending data: {:?}", err);
}
Ok(Async::NotReady) => {
self.dispatch_state = FramedState::FlushAndStop(vec);
return Ok(Async::NotReady);
}
Ok(Async::Ready(_)) => (),
}
FramedState::FlushAndStop(mut vec) => {
if !framed.is_write_buf_empty() {
match Pin::new(framed).poll_flush(cx) {
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
}
Poll::Pending => {
*dispatch_state = FramedState::FlushAndStop(vec);
return Poll::Pending;
}
Poll::Ready(_) => (),
}
};
for tx in vec.drain(..) {
let _ = tx.send(());
}
if let Some(ref disconnect) = disconnect {
(&*disconnect)(&mut *state.get_mut(), false);
}
Poll::Ready(Ok(()))
}
FramedState::FramedError(err) => {
if let Some(ref disconnect) = disconnect {
(&*disconnect)(&mut *state.get_mut(), true);
}
Poll::Ready(Err(err))
}
FramedState::Stopping => {
if let Some(ref disconnect) = disconnect {
(&*disconnect)(&mut *state.get_mut(), false);
}
Poll::Ready(Ok(()))
}
}
}
fn poll_read<St, S, T, U>(
cx: &mut Context,
srv: &mut S,
state: &mut State<St>,
sink: &mut Sink<<U as Encoder>::Item>,
framed: &mut Framed<T, U>,
dispatch_state: &mut FramedState<S, U>,
inner: &mut Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
) -> bool
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
loop {
match srv.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
let item = match framed.next_item(cx) {
Poll::Ready(Some(Ok(el))) => el,
Poll::Ready(Some(Err(err))) => {
*dispatch_state = FramedState::FramedError(ServiceError::Decoder(err));
return true;
}
Poll::Pending => return false,
Poll::Ready(None) => {
log::trace!("Client disconnected");
*dispatch_state = FramedState::Stopping;
return true;
}
};
for tx in vec.drain(..) {
let _ = tx.send(());
}
self.disconnect(false);
Ok(Async::Ready(()))
let mut cell = inner.clone();
tokio_executor::current_thread::spawn(
srv.call(Item::new(state.clone(), sink.clone(), item))
.then(move |item| {
let item = match item {
Ok(Some(item)) => Ok(item),
Ok(None) => return ready(()),
Err(err) => Err(err),
};
unsafe {
let inner = cell.get_mut();
inner.buf.push_back(item);
inner.task.wake();
}
ready(())
}),
);
}
FramedState::FramedError(err) => {
self.disconnect(true);
Err(err)
}
FramedState::Stopping => {
self.disconnect(false);
Ok(Async::Ready(()))
Poll::Pending => return false,
Poll::Ready(Err(err)) => {
*dispatch_state = FramedState::Error(ServiceError::Service(err));
return true;
}
}
}
}
/// write to framed object
fn poll_write<St, S, T, U>(
cx: &mut Context,
framed: &mut Framed<T, U>,
dispatch_state: &mut FramedState<S, U>,
rx: &mut Option<mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>>,
inner: &mut Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
) -> bool
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
let inner = unsafe { inner.get_mut() };
let mut rx_done = rx.is_none();
let mut buf_empty = inner.buf.is_empty();
loop {
while !framed.is_write_buf_full() {
if !buf_empty {
match inner.buf.pop_front().unwrap() {
Ok(msg) => {
if let Err(err) = framed.force_send(msg) {
*dispatch_state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
buf_empty = inner.buf.is_empty();
}
Err(err) => {
*dispatch_state = FramedState::Error(ServiceError::Service(err));
return true;
}
}
}
if !rx_done && rx.is_some() {
match Pin::new(rx.as_mut().unwrap()).poll_next(cx) {
Poll::Ready(Some(FramedMessage::Message(msg))) => {
if let Err(err) = framed.force_send(msg) {
*dispatch_state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
}
Poll::Ready(Some(FramedMessage::Close)) => {
dispatch_state.stop(None);
return true;
}
Poll::Ready(Some(FramedMessage::WaitClose(tx))) => {
dispatch_state.stop(Some(tx));
return true;
}
Poll::Ready(None) => {
rx_done = true;
let _ = rx.take();
}
Poll::Pending => rx_done = true,
}
}
if rx_done && buf_empty {
break;
}
}
if !framed.is_write_buf_empty() {
match framed.flush(cx) {
Poll::Pending => break,
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
*dispatch_state = FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
Poll::Ready(_) => (),
}
} else {
break;
}
}
false
}

View File

@ -1,9 +1,14 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::{Async, Future, Poll};
use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory};
use either::Either;
use futures::future::{FutureExt, LocalBoxFuture};
use pin_project::{pin_project, project};
use crate::connect::{Connect, ConnectResult};
use crate::dispatcher::FramedDispatcher;
@ -27,9 +32,9 @@ impl<St, Codec> Builder<St, Codec> {
pub fn service<Io, C, F>(self, connect: F) -> ServiceBuilder<St, C, Io, Codec>
where
F: IntoService<C>,
Io: AsyncRead + AsyncWrite,
Io: AsyncRead + AsyncWrite + Unpin,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
Codec: Decoder + Encoder,
Codec: Decoder + Encoder + Unpin,
{
ServiceBuilder {
connect: connect.into_service(),
@ -41,19 +46,19 @@ impl<St, Codec> Builder<St, Codec> {
/// Construct framed handler new service with specified connect service
pub fn factory<Io, C, F>(self, connect: F) -> NewServiceBuilder<St, C, Io, Codec>
where
F: IntoNewService<C>,
Io: AsyncRead + AsyncWrite,
C: NewService<
F: IntoServiceFactory<C>,
Io: AsyncRead + AsyncWrite + Unpin,
C: ServiceFactory<
Config = (),
Request = Connect<Io>,
Response = ConnectResult<Io, St, Codec>,
>,
C::Error: 'static,
C::Future: 'static,
Codec: Decoder + Encoder,
Codec: Decoder + Encoder + Unpin,
{
NewServiceBuilder {
connect: connect.into_new_service(),
connect: connect.into_factory(),
disconnect: None,
_t: PhantomData,
}
@ -69,10 +74,10 @@ pub struct ServiceBuilder<St, C, Io, Codec> {
impl<St, C, Io, Codec> ServiceBuilder<St, C, Io, Codec>
where
St: 'static,
Io: AsyncRead + AsyncWrite,
Io: AsyncRead + AsyncWrite + Unpin,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
Codec: Decoder + Encoder,
Codec: Decoder + Encoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
@ -93,8 +98,8 @@ where
service: F,
) -> impl Service<Request = Io, Response = (), Error = ServiceError<C::Error, Codec>>
where
F: IntoNewService<T>,
T: NewService<
F: IntoServiceFactory<T>,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
@ -104,7 +109,7 @@ where
{
FramedServiceImpl {
connect: self.connect,
handler: Rc::new(service.into_new_service()),
handler: Rc::new(service.into_factory()),
disconnect: self.disconnect.clone(),
_t: PhantomData,
}
@ -120,11 +125,15 @@ pub struct NewServiceBuilder<St, C, Io, Codec> {
impl<St, C, Io, Codec> NewServiceBuilder<St, C, Io, Codec>
where
St: 'static,
Io: AsyncRead + AsyncWrite,
C: NewService<Config = (), Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
Io: AsyncRead + AsyncWrite + Unpin,
C: ServiceFactory<
Config = (),
Request = Connect<Io>,
Response = ConnectResult<Io, St, Codec>,
>,
C::Error: 'static,
C::Future: 'static,
Codec: Decoder + Encoder,
Codec: Decoder + Encoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
@ -142,15 +151,15 @@ where
pub fn finish<F, T, Cfg>(
self,
service: F,
) -> impl NewService<
) -> impl ServiceFactory<
Config = Cfg,
Request = Io,
Response = (),
Error = ServiceError<C::Error, Codec>,
>
where
F: IntoNewService<T>,
T: NewService<
F: IntoServiceFactory<T>,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
@ -160,7 +169,7 @@ where
{
FramedService {
connect: self.connect,
handler: Rc::new(service.into_new_service()),
handler: Rc::new(service.into_factory()),
disconnect: self.disconnect,
_t: PhantomData,
}
@ -174,21 +183,25 @@ pub(crate) struct FramedService<St, C, T, Io, Codec, Cfg> {
_t: PhantomData<(St, Io, Codec, Cfg)>,
}
impl<St, C, T, Io, Codec, Cfg> NewService for FramedService<St, C, T, Io, Codec, Cfg>
impl<St, C, T, Io, Codec, Cfg> ServiceFactory for FramedService<St, C, T, Io, Codec, Cfg>
where
St: 'static,
Io: AsyncRead + AsyncWrite,
C: NewService<Config = (), Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
Io: AsyncRead + AsyncWrite + Unpin,
C: ServiceFactory<
Config = (),
Request = Connect<Io>,
Response = ConnectResult<Io, St, Codec>,
>,
C::Error: 'static,
C::Future: 'static,
T: NewService<
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
> + 'static,
Codec: Decoder + Encoder,
Codec: Decoder + Encoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
@ -198,23 +211,24 @@ where
type Error = ServiceError<C::Error, Codec>;
type InitError = C::InitError;
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec>;
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &Cfg) -> Self::Future {
let handler = self.handler.clone();
let disconnect = self.disconnect.clone();
// create connect service and then create service impl
Box::new(
self.connect
.new_service(&())
.map(move |connect| FramedServiceImpl {
self.connect
.new_service(&())
.map(move |result| {
result.map(move |connect| FramedServiceImpl {
connect,
handler,
disconnect,
_t: PhantomData,
}),
)
})
})
.boxed_local()
}
}
@ -227,18 +241,18 @@ pub struct FramedServiceImpl<St, C, T, Io, Codec> {
impl<St, C, T, Io, Codec> Service for FramedServiceImpl<St, C, T, Io, Codec>
where
Io: AsyncRead + AsyncWrite,
Io: AsyncRead + AsyncWrite + Unpin,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: NewService<
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as NewService>::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
Codec: Decoder + Encoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
@ -247,8 +261,8 @@ where
type Error = ServiceError<C::Error, Codec>;
type Future = FramedServiceImplResponse<St, Io, Codec, C, T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.connect.poll_ready().map_err(|e| e.into())
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.connect.poll_ready(cx).map_err(|e| e.into())
}
fn call(&mut self, req: Io) -> Self::Future {
@ -256,108 +270,155 @@ where
inner: FramedServiceImplResponseInner::Connect(
self.connect.call(Connect::new(req)),
self.handler.clone(),
self.disconnect.clone(),
),
disconnect: self.disconnect.clone(),
}
}
}
#[pin_project]
pub struct FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: NewService<
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as NewService>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
}
enum FramedServiceImplResponseInner<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: NewService<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as NewService>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
Connect(C::Future, Rc<T>),
Handler(T::Future, Option<ConnectResult<Io, St, Codec>>),
Dispatcher(FramedDispatcher<St, T::Service, Io, Codec>),
}
impl<St, Io, Codec, C, T> Future for FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: NewService<
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as NewService>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
type Item = ();
type Error = ServiceError<C::Error, Codec>;
type Output = Result<(), ServiceError<C::Error, Codec>>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
FramedServiceImplResponseInner::Connect(ref mut fut, ref handler) => {
match fut.poll()? {
Async::Ready(res) => {
self.inner = FramedServiceImplResponseInner::Handler(
handler.new_service(&res.state),
Some(res),
);
self.poll()
}
Async::NotReady => Ok(Async::NotReady),
}
}
FramedServiceImplResponseInner::Handler(ref mut fut, ref mut res) => {
match fut.poll()? {
Async::Ready(handler) => {
let res = res.take().unwrap();
self.inner =
FramedServiceImplResponseInner::Dispatcher(FramedDispatcher::new(
res.framed,
State::new(res.state),
handler,
res.rx,
res.sink,
self.disconnect.clone(),
));
self.poll()
}
Async::NotReady => Ok(Async::NotReady),
}
}
FramedServiceImplResponseInner::Dispatcher(ref mut fut) => fut.poll(),
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) {
Either::Left(new) => this.inner = new,
Either::Right(poll) => return poll,
};
}
}
}
#[pin_project]
enum FramedServiceImplResponseInner<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
Connect(#[pin] C::Future, Rc<T>, Option<Rc<dyn Fn(&mut St, bool)>>),
Handler(
#[pin] T::Future,
Option<ConnectResult<Io, St, Codec>>,
Option<Rc<dyn Fn(&mut St, bool)>>,
),
Dispatcher(FramedDispatcher<St, T::Service, Io, Codec>),
}
impl<St, Io, Codec, C, T> FramedServiceImplResponseInner<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
#[project]
fn poll(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Either<
FramedServiceImplResponseInner<St, Io, Codec, C, T>,
Poll<Result<(), ServiceError<C::Error, Codec>>>,
> {
#[project]
match self.project() {
FramedServiceImplResponseInner::Connect(
ref mut fut,
ref handler,
ref mut disconnect,
) => match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler(
handler.new_service(&res.state),
Some(res),
disconnect.take(),
)),
Poll::Pending => Either::Right(Poll::Pending),
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
},
FramedServiceImplResponseInner::Handler(
ref mut fut,
ref mut res,
ref mut disconnect,
) => match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(handler)) => {
let res = res.take().unwrap();
Either::Left(FramedServiceImplResponseInner::Dispatcher(
FramedDispatcher::new(
res.framed,
State::new(res.state),
handler,
res.rx,
res.sink,
disconnect.take(),
),
))
}
Poll::Pending => Either::Right(Poll::Pending),
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
},
FramedServiceImplResponseInner::Dispatcher(ref mut fut) => {
Either::Right(Pin::new(fut).poll(cx))
}
}
}
}

View File

@ -1,11 +1,11 @@
use std::fmt;
use futures::unsync::{mpsc, oneshot};
use futures::Future;
use actix_utils::{mpsc, oneshot};
use futures::future::{Future, FutureExt};
use crate::dispatcher::FramedMessage;
pub struct Sink<T>(mpsc::UnboundedSender<FramedMessage<T>>);
pub struct Sink<T>(mpsc::Sender<FramedMessage<T>>);
impl<T> Clone for Sink<T> {
fn clone(&self) -> Self {
@ -14,26 +14,26 @@ impl<T> Clone for Sink<T> {
}
impl<T> Sink<T> {
pub(crate) fn new(tx: mpsc::UnboundedSender<FramedMessage<T>>) -> Self {
pub(crate) fn new(tx: mpsc::Sender<FramedMessage<T>>) -> Self {
Sink(tx)
}
/// Close connection
pub fn close(&self) {
let _ = self.0.unbounded_send(FramedMessage::Close);
let _ = self.0.send(FramedMessage::Close);
}
/// Close connection
pub fn wait_close(&self) -> impl Future<Item = (), Error = ()> {
pub fn wait_close(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(FramedMessage::WaitClose(tx));
let _ = self.0.send(FramedMessage::WaitClose(tx));
rx.map_err(|_| ())
rx.map(|_| ())
}
/// Send item
pub fn send(&self, item: T) {
let _ = self.0.unbounded_send(FramedMessage::Message(item));
let _ = self.0.send(FramedMessage::Message(item));
}
}