diff --git a/Cargo.toml b/Cargo.toml index f8e43727..f032478a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,6 @@ members = [ "actix-codec", "actix-connect", - "actix-ioframe", "actix-rt", "actix-macros", "actix-service", @@ -19,7 +18,6 @@ members = [ [patch.crates-io] actix-codec = { path = "actix-codec" } actix-connect = { path = "actix-connect" } -actix-ioframe = { path = "actix-ioframe" } actix-rt = { path = "actix-rt" } actix-macros = { path = "actix-macros" } actix-server = { path = "actix-server" } diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index 2259e44e..fa471c01 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -1,6 +1,8 @@ # Changes -* Use `.advance()` intead of `.split_to()` +## Unreleased - 2020-xx-xx +* Use `.advance()` instead of `.split_to()`. +* Upgrade `tokio-util` to `0.3`. ## [0.2.0] - 2019-12-10 @@ -8,7 +10,7 @@ ## [0.2.0-alpha.4] -* Fix buffer remaining capacity calcualtion +* Fix buffer remaining capacity calculation ## [0.2.0-alpha.3] diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 1d9a669b..8b14dd70 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -21,7 +21,7 @@ bitflags = "1.2.1" bytes = "0.5.2" futures-core = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } -tokio = { version = "0.2.4", default-features=false } -tokio-util = { version = "0.2.0", default-features=false, features=["codec"] } +tokio = { version = "0.2.5", default-features = false } +tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] } log = "0.4" pin-project = "0.4.17" diff --git a/actix-codec/src/bcodec.rs b/actix-codec/src/bcodec.rs index c71c0fa4..4d92ee69 100644 --- a/actix-codec/src/bcodec.rs +++ b/actix-codec/src/bcodec.rs @@ -9,8 +9,7 @@ use super::{Decoder, Encoder}; #[derive(Debug, Copy, Clone)] pub struct BytesCodec; -impl Encoder for BytesCodec { - type Item = Bytes; +impl Encoder for BytesCodec { type Error = io::Error; fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 685d87c9..cba5dd3c 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -9,7 +9,9 @@ use pin_project::pin_project; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; +/// Low-water mark const LW: usize = 1024; +/// High-water mark const HW: usize = 8 * 1024; bitflags::bitflags! { @@ -34,7 +36,7 @@ pub struct Framed { impl Framed where T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, + U: Decoder, { /// Provides a `Stream` and `Sink` interface for reading and writing to this /// `Io` object, using `Decode` and `Encode` to read and write the raw data. @@ -129,7 +131,7 @@ impl Framed { } /// Consume the `Frame`, returning `Frame` with different codec. - pub fn into_framed(self, codec: U2) -> Framed { + pub fn into_framed(self, codec: U2) -> Framed { Framed { codec, io: self.io, @@ -140,7 +142,7 @@ impl Framed { } /// Consume the `Frame`, returning `Frame` with different io. - pub fn map_io(self, f: F) -> Framed + pub fn map_io(self, f: F) -> Framed where F: Fn(T) -> T2, { @@ -154,7 +156,7 @@ impl Framed { } /// Consume the `Frame`, returning `Frame` with different codec. - pub fn map_codec(self, f: F) -> Framed + pub fn map_codec(self, f: F) -> Framed where F: Fn(U) -> U2, { @@ -186,10 +188,10 @@ impl Framed { impl Framed { /// Serialize item and Write to the inner buffer - pub fn write(mut self: Pin<&mut Self>, item: ::Item) -> Result<(), ::Error> + pub fn write(mut self: Pin<&mut Self>, item: I) -> Result<(), >::Error> where T: AsyncWrite, - U: Encoder, + U: Encoder, { let this = self.as_mut().project(); let remaining = this.write_buf.capacity() - this.write_buf.len(); @@ -209,7 +211,10 @@ impl Framed { } /// Try to read underlying I/O stream and decode item. - pub fn next_item(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> + pub fn next_item( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll::Item, U::Error>>> where T: AsyncRead, U: Decoder, @@ -266,10 +271,10 @@ impl Framed { } /// Flush write buffer to underlying I/O stream. - pub fn flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> + pub fn flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> where T: AsyncWrite, - U: Encoder, + U: Encoder, { let mut this = self.as_mut().project(); log::trace!("flushing framed transport"); @@ -277,9 +282,7 @@ impl Framed { while !this.write_buf.is_empty() { log::trace!("writing; remaining={}", this.write_buf.len()); - let n = ready!( - this.io.as_mut().poll_write(cx, this.write_buf) - )?; + let n = ready!(this.io.as_mut().poll_write(cx, this.write_buf))?; if n == 0 { return Poll::Ready(Err(io::Error::new( @@ -301,10 +304,10 @@ impl Framed { } /// Flush write buffer and shutdown underlying I/O stream. - pub fn close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> + pub fn close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> where T: AsyncWrite, - U: Encoder, + U: Encoder, { let mut this = self.as_mut().project(); ready!(this.io.as_mut().poll_flush(cx))?; @@ -325,10 +328,10 @@ where } } -impl Sink for Framed +impl Sink for Framed where T: AsyncWrite, - U: Encoder, + U: Encoder, U::Error: From, { type Error = U::Error; @@ -341,24 +344,15 @@ where } } - fn start_send( - self: Pin<&mut Self>, - item: ::Item, - ) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { self.write(item) } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.flush(cx) } - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.close(cx) } } diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index 7c38bdf7..3035be13 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -8,7 +8,7 @@ //! [`AsyncWrite`]: AsyncWrite //! [`Sink`]: futures_sink::Sink //! [`Stream`]: futures_core::Stream -#![deny(rust_2018_idioms, warnings)] +#![deny(rust_2018_idioms)] mod bcodec; mod framed; diff --git a/actix-ioframe/CHANGES.md b/actix-ioframe/CHANGES.md deleted file mode 100644 index 1b332a52..00000000 --- a/actix-ioframe/CHANGES.md +++ /dev/null @@ -1,33 +0,0 @@ -# Changes - -## [0.5.0] - 2019-12-29 - -* Simplify state management - -* Allow to set custom output stream - -* Removed disconnect callback - -## [0.4.1] - 2019-12-11 - -* Disconnect callback accepts owned state - -## [0.4.0] - 2019-12-11 - -* Remove `E` param - -## [0.3.0-alpha.3] - 2019-12-07 - -* Migrate to tokio 0.2 - -## [0.3.0-alpha.2] - 2019-12-02 - -* Migrate to `std::future` - -## [0.1.1] - 2019-10-14 - -* Re-register task on every dispatcher poll. - -## [0.1.0] - 2019-09-25 - -* Initial release diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml deleted file mode 100644 index 0cf431a6..00000000 --- a/actix-ioframe/Cargo.toml +++ /dev/null @@ -1,33 +0,0 @@ -[package] -name = "actix-ioframe" -version = "0.5.0" -authors = ["Nikolay Kim "] -description = "Actix framed service" -keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-ioframe/" -categories = ["network-programming", "asynchronous"] -license = "MIT OR Apache-2.0" -edition = "2018" - -[lib] -name = "actix_ioframe" -path = "src/lib.rs" - -[dependencies] -actix-service = "1.0.1" -actix-codec = "0.2.0" -actix-utils = "1.0.4" -actix-rt = "1.0.0" -bytes = "0.5.3" -either = "1.5.3" -futures-sink = { version = "0.3.4", default-features = false } -futures-core = { version = "0.3.4", default-features = false } -pin-project = "0.4.17" -log = "0.4" - -[dev-dependencies] -actix-connect = "2.0.0-alpha.2" -actix-testing = "1.0.0" -futures-util = { version = "0.3.4", default-features = false } diff --git a/actix-ioframe/LICENSE-APACHE b/actix-ioframe/LICENSE-APACHE deleted file mode 120000 index 965b606f..00000000 --- a/actix-ioframe/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/actix-ioframe/LICENSE-MIT b/actix-ioframe/LICENSE-MIT deleted file mode 120000 index 76219eb7..00000000 --- a/actix-ioframe/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/actix-ioframe/README.md b/actix-ioframe/README.md new file mode 100644 index 00000000..f45c590d --- /dev/null +++ b/actix-ioframe/README.md @@ -0,0 +1,3 @@ +# actix-ioframe + +**This crate has been deprecated and removed.** diff --git a/actix-ioframe/src/connect.rs b/actix-ioframe/src/connect.rs deleted file mode 100644 index 4e2980d1..00000000 --- a/actix-ioframe/src/connect.rs +++ /dev/null @@ -1,123 +0,0 @@ -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; -use actix_utils::mpsc::Receiver; -use futures_core::stream::Stream; - -pub struct Connect -where - Codec: Encoder + Decoder, -{ - io: Io, - _t: PhantomData, -} - -impl Connect -where - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, -{ - pub(crate) fn new(io: Io) -> Self { - Self { - io, - _t: PhantomData, - } - } - - pub fn codec( - self, - codec: Codec, - ) -> ConnectResult::Item>> { - ConnectResult { - state: (), - out: None, - framed: Framed::new(self.io, codec), - } - } -} - -#[pin_project::pin_project] -pub struct ConnectResult { - pub(crate) state: St, - pub(crate) out: Option, - #[pin] - pub(crate) framed: Framed, -} - -impl ConnectResult { - #[inline] - pub fn get_ref(&self) -> &Io { - self.framed.get_ref() - } - - #[inline] - pub fn get_mut(&mut self) -> &mut Io { - self.framed.get_mut() - } - - pub fn out(self, out: U) -> ConnectResult - where - U: Stream::Item> + Unpin, - { - ConnectResult { - state: self.state, - framed: self.framed, - out: Some(out), - } - } - - #[inline] - pub fn state(self, state: S) -> ConnectResult { - ConnectResult { - state, - framed: self.framed, - out: self.out, - } - } -} - -impl Stream for ConnectResult -where - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, -{ - type Item = Result<::Item, ::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().framed.next_item(cx) - } -} - -impl futures_sink::Sink<::Item> - for ConnectResult -where - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, -{ - type Error = ::Error; - - fn poll_ready(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - if self.as_mut().project().framed.is_write_ready() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn start_send( - self: Pin<&mut Self>, - item: ::Item, - ) -> Result<(), Self::Error> { - self.project().framed.write(item) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().project().framed.flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().project().framed.close(cx) - } -} diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs deleted file mode 100644 index b7d5dd9f..00000000 --- a/actix-ioframe/src/dispatcher.rs +++ /dev/null @@ -1,248 +0,0 @@ -//! Framed dispatcher service and related utilities -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; -use actix_service::Service; -use actix_utils::mpsc; -use futures_core::stream::Stream; -use pin_project::pin_project; -use log::debug; - -use crate::error::ServiceError; - -type Request = ::Item; -type Response = ::Item; - -/// FramedTransport - is a future that reads frames from Framed object -/// and pass then to the service. -#[pin_project] -pub(crate) struct Dispatcher -where - S: Service, Response = Option>>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - service: S, - sink: Option, - state: FramedState, - #[pin] - framed: Framed, - rx: mpsc::Receiver::Item, S::Error>>, -} - -impl Dispatcher -where - S: Service, Response = Option>>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - pub(crate) fn new(framed: Framed, service: S, sink: Option) -> Self { - Dispatcher { - sink, - service, - framed, - rx: mpsc::channel().1, - state: FramedState::Processing, - } - } -} - -enum FramedState { - Processing, - Error(ServiceError), - FramedError(ServiceError), - FlushAndStop, - Stopping, -} - -impl FramedState { - fn take_error(&mut self) -> ServiceError { - match std::mem::replace(self, FramedState::Processing) { - FramedState::Error(err) => err, - _ => panic!(), - } - } - - fn take_framed_error(&mut self) -> ServiceError { - match std::mem::replace(self, FramedState::Processing) { - FramedState::FramedError(err) => err, - _ => panic!(), - } - } -} - -impl Dispatcher -where - S: Service, Response = Option>>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool { - loop { - let this = self.as_mut().project(); - match this.service.poll_ready(cx) { - Poll::Ready(Ok(_)) => { - let item = match this.framed.next_item(cx) { - Poll::Ready(Some(Ok(el))) => el, - Poll::Ready(Some(Err(err))) => { - *this.state = FramedState::FramedError(ServiceError::Decoder(err)); - return true; - } - Poll::Pending => return false, - Poll::Ready(None) => { - log::trace!("Client disconnected"); - *this.state = FramedState::Stopping; - return true; - } - }; - - let tx = this.rx.sender(); - let fut = this.service.call(item); - actix_rt::spawn(async move { - let item = fut.await; - let item = match item { - Ok(Some(item)) => Ok(item), - Ok(None) => return, - Err(err) => Err(err), - }; - let _ = tx.send(item); - }); - } - Poll::Pending => return false, - Poll::Ready(Err(err)) => { - *this.state = FramedState::Error(ServiceError::Service(err)); - return true; - } - } - } - } - - /// write to framed object - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool { - loop { - let mut this = self.as_mut().project(); - while !this.framed.is_write_buf_full() { - match Pin::new(&mut this.rx).poll_next(cx) { - Poll::Ready(Some(Ok(msg))) => { - if let Err(err) = this.framed.as_mut().write(msg) { - *this.state = FramedState::FramedError(ServiceError::Encoder(err)); - return true; - } - continue; - } - Poll::Ready(Some(Err(err))) => { - *this.state = FramedState::Error(ServiceError::Service(err)); - return true; - } - Poll::Ready(None) | Poll::Pending => (), - } - - if this.sink.is_some() { - match Pin::new(this.sink.as_mut().unwrap()).poll_next(cx) { - Poll::Ready(Some(msg)) => { - if let Err(err) = this.framed.as_mut().write(msg) { - *this.state = - FramedState::FramedError(ServiceError::Encoder(err)); - return true; - } - continue; - } - Poll::Ready(None) => { - let _ = this.sink.take(); - *this.state = FramedState::FlushAndStop; - return true; - } - Poll::Pending => (), - } - } - break; - } - - if !this.framed.is_write_buf_empty() { - match this.framed.as_mut().flush(cx) { - Poll::Pending => break, - Poll::Ready(Ok(_)) => (), - Poll::Ready(Err(err)) => { - debug!("Error sending data: {:?}", err); - *this.state = FramedState::FramedError(ServiceError::Encoder(err)); - return true; - } - } - } else { - break; - } - } - false - } - - pub(crate) fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let mut this = self.as_mut().project(); - match this.state { - FramedState::Processing => loop { - let read = self.as_mut().poll_read(cx); - let write = self.as_mut().poll_write(cx); - if read || write { - continue; - } else { - return Poll::Pending; - } - }, - FramedState::Error(_) => { - // flush write buffer - if !this.framed.is_write_buf_empty() { - if let Poll::Pending = this.framed.flush(cx) { - return Poll::Pending; - } - } - Poll::Ready(Err(this.state.take_error())) - } - FramedState::FlushAndStop => { - // drain service responses - match Pin::new(this.rx).poll_next(cx) { - Poll::Ready(Some(Ok(msg))) => { - if this.framed.as_mut().write(msg).is_err() { - return Poll::Ready(Ok(())); - } - } - Poll::Ready(Some(Err(_))) => return Poll::Ready(Ok(())), - Poll::Ready(None) | Poll::Pending => (), - } - - // flush io - if !this.framed.is_write_buf_empty() { - match this.framed.flush(cx) { - Poll::Ready(Err(err)) => { - debug!("Error sending data: {:?}", err); - } - Poll::Pending => { - return Poll::Pending; - } - Poll::Ready(_) => (), - } - }; - Poll::Ready(Ok(())) - } - FramedState::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())), - FramedState::Stopping => Poll::Ready(Ok(())), - } - } -} diff --git a/actix-ioframe/src/error.rs b/actix-ioframe/src/error.rs deleted file mode 100644 index 3eb0fed9..00000000 --- a/actix-ioframe/src/error.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::fmt; - -use actix_codec::{Decoder, Encoder}; - -/// Framed service errors -pub enum ServiceError { - /// Inner service error - Service(E), - /// Encoder parse error - Encoder(::Error), - /// Decoder parse error - Decoder(::Error), -} - -impl From for ServiceError { - fn from(err: E) -> Self { - ServiceError::Service(err) - } -} - -impl fmt::Debug for ServiceError -where - E: fmt::Debug, - ::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - ServiceError::Service(ref e) => write!(fmt, "ServiceError::Service({:?})", e), - ServiceError::Encoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e), - ServiceError::Decoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e), - } - } -} - -impl fmt::Display for ServiceError -where - E: fmt::Display, - ::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - ServiceError::Service(ref e) => write!(fmt, "{}", e), - ServiceError::Encoder(ref e) => write!(fmt, "{:?}", e), - ServiceError::Decoder(ref e) => write!(fmt, "{:?}", e), - } - } -} diff --git a/actix-ioframe/src/lib.rs b/actix-ioframe/src/lib.rs deleted file mode 100644 index 3f82a29f..00000000 --- a/actix-ioframe/src/lib.rs +++ /dev/null @@ -1,11 +0,0 @@ -// #![deny(rust_2018_idioms, warnings)] -#![allow(clippy::type_complexity, clippy::too_many_arguments)] - -mod connect; -mod dispatcher; -mod error; -mod service; - -pub use self::connect::{Connect, ConnectResult}; -pub use self::error::ServiceError; -pub use self::service::{Builder, FactoryBuilder}; diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs deleted file mode 100644 index f3b5ab85..00000000 --- a/actix-ioframe/src/service.rs +++ /dev/null @@ -1,413 +0,0 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; - -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; -use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; -use either::Either; -use futures_core::{ready, stream::Stream}; - -use crate::connect::{Connect, ConnectResult}; -use crate::dispatcher::Dispatcher; -use crate::error::ServiceError; - -type RequestItem = ::Item; -type ResponseItem = Option<::Item>; - -/// Service builder - structure that follows the builder pattern -/// for building instances for framed services. -pub struct Builder { - connect: C, - _t: PhantomData<(St, Io, Codec, Out)>, -} - -impl Builder -where - C: Service, Response = ConnectResult>, - Io: AsyncRead + AsyncWrite, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - /// Construct framed handler service with specified connect service - pub fn new(connect: F) -> Builder - where - F: IntoService, - Io: AsyncRead + AsyncWrite, - C: Service, Response = ConnectResult>, - Codec: Decoder + Encoder, - Out: Stream::Item>, - { - Builder { - connect: connect.into_service(), - _t: PhantomData, - } - } - - /// Provide stream items handler service and construct service factory. - pub fn build(self, service: F) -> FramedServiceImpl - where - F: IntoServiceFactory, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - { - FramedServiceImpl { - connect: self.connect, - handler: Rc::new(service.into_factory()), - _t: PhantomData, - } - } -} - -/// Service builder - structure that follows the builder pattern -/// for building instances for framed services. -pub struct FactoryBuilder { - connect: C, - _t: PhantomData<(St, Io, Codec, Out)>, -} - -impl FactoryBuilder -where - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - Codec: Decoder + Encoder, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - /// Construct framed handler new service with specified connect service - pub fn new(connect: F) -> FactoryBuilder - where - F: IntoServiceFactory, - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - Codec: Decoder + Encoder, - Out: Stream::Item> + Unpin, - { - FactoryBuilder { - connect: connect.into_factory(), - _t: PhantomData, - } - } - - pub fn build(self, service: F) -> FramedService - where - F: IntoServiceFactory, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - { - FramedService { - connect: self.connect, - handler: Rc::new(service.into_factory()), - _t: PhantomData, - } - } -} - -pub struct FramedService { - connect: C, - handler: Rc, - _t: PhantomData<(St, Io, Codec, Out, Cfg)>, -} - -impl ServiceFactory - for FramedService -where - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - type Config = Cfg; - type Request = Io; - type Response = (); - type Error = ServiceError; - type InitError = C::InitError; - type Service = FramedServiceImpl; - type Future = FramedServiceResponse; - - fn new_service(&self, _: Cfg) -> Self::Future { - // create connect service and then create service impl - FramedServiceResponse { - fut: self.connect.new_service(()), - handler: self.handler.clone(), - } - } -} - -#[pin_project::pin_project] -pub struct FramedServiceResponse -where - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - #[pin] - fut: C::Future, - handler: Rc, -} - -impl Future for FramedServiceResponse -where - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - type Output = Result, C::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let connect = ready!(this.fut.poll(cx))?; - - Poll::Ready(Ok(FramedServiceImpl { - connect, - handler: this.handler.clone(), - _t: PhantomData, - })) - } -} - -pub struct FramedServiceImpl { - connect: C, - handler: Rc, - _t: PhantomData<(St, Io, Codec, Out)>, -} - -impl Service for FramedServiceImpl -where - Io: AsyncRead + AsyncWrite, - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - type Request = Io; - type Response = (); - type Error = ServiceError; - type Future = FramedServiceImplResponse; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.connect.poll_ready(cx).map_err(|e| e.into()) - } - - fn call(&mut self, req: Io) -> Self::Future { - FramedServiceImplResponse { - inner: FramedServiceImplResponseInner::Connect( - self.connect.call(Connect::new(req)), - self.handler.clone(), - ), - } - } -} - -#[pin_project::pin_project] -pub struct FramedServiceImplResponse -where - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - #[pin] - inner: FramedServiceImplResponseInner, -} - -impl Future for FramedServiceImplResponse -where - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - type Output = Result<(), ServiceError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - loop { - match this.inner.poll(cx) { - Either::Left(new) => { - this = self.as_mut().project(); - this.inner.set(new) - } - Either::Right(poll) => return poll, - }; - } - } -} - -#[pin_project::pin_project(project = FramedServiceImplResponseInnerProj)] -enum FramedServiceImplResponseInner -where - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - Connect(#[pin] C::Future, Rc), - Handler(#[pin] T::Future, Option>, Option), - Dispatcher(#[pin] Dispatcher), -} - -impl FramedServiceImplResponseInner -where - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Either< - FramedServiceImplResponseInner, - Poll>>, - > { - match self.project() { - FramedServiceImplResponseInnerProj::Connect(fut, handler) => match fut.poll(cx) { - Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler( - handler.new_service(res.state), - Some(res.framed), - res.out, - )), - Poll::Pending => Either::Right(Poll::Pending), - Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), - }, - FramedServiceImplResponseInnerProj::Handler(fut, framed, out) => { - match fut.poll(cx) { - Poll::Ready(Ok(handler)) => { - Either::Left(FramedServiceImplResponseInner::Dispatcher( - Dispatcher::new(framed.take().unwrap(), handler, out.take()), - )) - } - Poll::Pending => Either::Right(Poll::Pending), - Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), - } - } - FramedServiceImplResponseInnerProj::Dispatcher(fut) => { - Either::Right(fut.poll(cx)) - } - } - } -} diff --git a/actix-ioframe/tests/test_server.rs b/actix-ioframe/tests/test_server.rs deleted file mode 100644 index 9d3775b3..00000000 --- a/actix-ioframe/tests/test_server.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::cell::Cell; -use std::rc::Rc; - -use actix_codec::BytesCodec; -use actix_service::{fn_factory_with_config, fn_service, IntoService, Service}; -use actix_testing::TestServer; -use actix_utils::mpsc; -use bytes::{Bytes, BytesMut}; -use futures_util::future::ok; - -use actix_ioframe::{Builder, Connect, FactoryBuilder}; - -#[derive(Clone)] -struct State(Option>); - -#[actix_rt::test] -async fn test_basic() { - let client_item = Rc::new(Cell::new(false)); - - let srv = TestServer::with(move || { - FactoryBuilder::new(fn_service(|conn: Connect<_, _>| { - ok(conn.codec(BytesCodec).state(State(None))) - })) - // echo - .build(fn_service(|t: BytesMut| ok(Some(t.freeze())))) - }); - - let item = client_item.clone(); - let mut client = Builder::new(fn_service(move |conn: Connect<_, _>| { - async move { - let (tx, rx) = mpsc::channel(); - let _ = tx.send(Bytes::from_static(b"Hello")); - Ok(conn.codec(BytesCodec).out(rx).state(State(Some(tx)))) - } - })) - .build(fn_factory_with_config(move |mut cfg: State| { - let item = item.clone(); - ok((move |t: BytesMut| { - assert_eq!(t.freeze(), Bytes::from_static(b"Hello")); - item.set(true); - // drop Sender, which will close connection - cfg.0.take(); - ok::<_, ()>(None) - }) - .into_service()) - })); - - let conn = actix_connect::default_connector() - .call(actix_connect::Connect::with(String::new(), srv.addr())) - .await - .unwrap(); - - client.call(conn.into_parts().0).await.unwrap(); - assert!(client_item.get()); -} diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 4a5ec485..0c036f67 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,5 +1,8 @@ # Changes +## Unreleased - 2020-xx-xx +* Upgrade `tokio-util` to `0.3`. + ## [1.0.6] - 2020-01-08 * Add `Clone` impl for `condition::Waiter` diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index 1663db9d..fef70ff0 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -6,31 +6,28 @@ use std::{fmt, mem}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, Service}; -use futures_util::{future::Future, FutureExt, stream::Stream}; +use futures_util::{future::Future, stream::Stream, FutureExt}; use log::debug; use crate::mpsc; -type Request = ::Item; -type Response = ::Item; - /// Framed transport errors -pub enum DispatcherError { +pub enum DispatcherError + Decoder, I> { Service(E), - Encoder(::Error), + Encoder(>::Error), Decoder(::Error), } -impl From for DispatcherError { +impl + Decoder, I> From for DispatcherError { fn from(err: E) -> Self { DispatcherError::Service(err) } } -impl fmt::Debug for DispatcherError +impl + Decoder, I> fmt::Debug for DispatcherError where E: fmt::Debug, - ::Error: fmt::Debug, + >::Error: fmt::Debug, ::Error: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -42,10 +39,10 @@ where } } -impl fmt::Display for DispatcherError +impl + Decoder, I> fmt::Display for DispatcherError where E: fmt::Display, - ::Error: fmt::Debug, + >::Error: fmt::Debug, ::Error: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -62,44 +59,44 @@ pub enum Message { Close, } -/// FramedTransport - is a future that reads frames from Framed object -/// and pass then to the service. +/// Dispatcher is a future that reads frames from Framed object +/// and passes them to the service. #[pin_project::pin_project] -pub struct Dispatcher +pub struct Dispatcher where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Encoder + Decoder, + I: 'static, + >::Error: std::fmt::Debug, { service: S, - state: State, + state: State, #[pin] framed: Framed, - rx: mpsc::Receiver::Item>, S::Error>>, - tx: mpsc::Sender::Item>, S::Error>>, + rx: mpsc::Receiver, S::Error>>, + tx: mpsc::Sender, S::Error>>, } -enum State { +enum State + Decoder, I> { Processing, - Error(DispatcherError), - FramedError(DispatcherError), + Error(DispatcherError), + FramedError(DispatcherError), FlushAndStop, Stopping, } -impl State { - fn take_error(&mut self) -> DispatcherError { +impl + Decoder, I> State { + fn take_error(&mut self) -> DispatcherError { match mem::replace(self, State::Processing) { State::Error(err) => err, _ => panic!(), } } - fn take_framed_error(&mut self) -> DispatcherError { + fn take_framed_error(&mut self) -> DispatcherError { match mem::replace(self, State::Processing) { State::FramedError(err) => err, _ => panic!(), @@ -107,15 +104,16 @@ impl State { } } -impl Dispatcher +impl Dispatcher where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Decoder + Encoder, + I: 'static, + ::Error: std::fmt::Debug, + >::Error: std::fmt::Debug, { pub fn new>(framed: Framed, service: F) -> Self { let (tx, rx) = mpsc::channel(); @@ -132,7 +130,7 @@ where pub fn with_rx>( framed: Framed, service: F, - rx: mpsc::Receiver::Item>, S::Error>>, + rx: mpsc::Receiver, S::Error>>, ) -> Self { let tx = rx.sender(); Dispatcher { @@ -145,7 +143,7 @@ where } /// Get sink - pub fn get_sink(&self) -> mpsc::Sender::Item>, S::Error>> { + pub fn get_sink(&self) -> mpsc::Sender, S::Error>> { self.tx.clone() } @@ -172,13 +170,13 @@ where fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Decoder + Encoder, + I: 'static, + >::Error: std::fmt::Debug, { loop { let this = self.as_mut().project(); @@ -214,13 +212,13 @@ where /// write to framed object fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Decoder + Encoder, + I: 'static, + >::Error: std::fmt::Debug, { loop { let mut this = self.as_mut().project(); @@ -263,18 +261,18 @@ where } } -impl Future for Dispatcher +impl Future for Dispatcher where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Decoder + Encoder, + I: 'static, + >::Error: std::fmt::Debug, ::Error: std::fmt::Debug, { - type Output = Result<(), DispatcherError>; + type Output = Result<(), DispatcherError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index c4d56c56..3c66accc 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -1,5 +1,5 @@ //! Actix utils - various helper services -#![deny(rust_2018_idioms, warnings)] +#![deny(rust_2018_idioms)] #![allow(clippy::type_complexity)] mod cell;