1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 11:51:50 +01:00

Upgrade tokio utils to 0.3 (#138)

This commit is contained in:
Rob Ede 2020-07-19 21:44:26 +01:00 committed by GitHub
parent a9b5a7b070
commit 334c98575a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 83 additions and 1053 deletions

View File

@ -2,7 +2,6 @@
members = [
"actix-codec",
"actix-connect",
"actix-ioframe",
"actix-rt",
"actix-macros",
"actix-service",
@ -19,7 +18,6 @@ members = [
[patch.crates-io]
actix-codec = { path = "actix-codec" }
actix-connect = { path = "actix-connect" }
actix-ioframe = { path = "actix-ioframe" }
actix-rt = { path = "actix-rt" }
actix-macros = { path = "actix-macros" }
actix-server = { path = "actix-server" }

View File

@ -1,6 +1,8 @@
# Changes
* Use `.advance()` intead of `.split_to()`
## Unreleased - 2020-xx-xx
* Use `.advance()` instead of `.split_to()`.
* Upgrade `tokio-util` to `0.3`.
## [0.2.0] - 2019-12-10
@ -8,7 +10,7 @@
## [0.2.0-alpha.4]
* Fix buffer remaining capacity calcualtion
* Fix buffer remaining capacity calculation
## [0.2.0-alpha.3]

View File

@ -21,7 +21,7 @@ bitflags = "1.2.1"
bytes = "0.5.2"
futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false }
tokio = { version = "0.2.4", default-features=false }
tokio-util = { version = "0.2.0", default-features=false, features=["codec"] }
tokio = { version = "0.2.5", default-features = false }
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }
log = "0.4"
pin-project = "0.4.17"

View File

@ -9,8 +9,7 @@ use super::{Decoder, Encoder};
#[derive(Debug, Copy, Clone)]
pub struct BytesCodec;
impl Encoder for BytesCodec {
type Item = Bytes;
impl Encoder<Bytes> for BytesCodec {
type Error = io::Error;
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {

View File

@ -9,7 +9,9 @@ use pin_project::pin_project;
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};
/// Low-water mark
const LW: usize = 1024;
/// High-water mark
const HW: usize = 8 * 1024;
bitflags::bitflags! {
@ -34,7 +36,7 @@ pub struct Framed<T, U> {
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
U: Decoder,
{
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
@ -129,7 +131,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2>(self, codec: U2) -> Framed<T, U2> {
pub fn into_framed<U2, I2>(self, codec: U2) -> Framed<T, U2> {
Framed {
codec,
io: self.io,
@ -140,7 +142,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different io.
pub fn map_io<F, T2>(self, f: F) -> Framed<T2, U>
pub fn map_io<F, T2, I2>(self, f: F) -> Framed<T2, U>
where
F: Fn(T) -> T2,
{
@ -154,7 +156,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2>(self, f: F) -> Framed<T, U2>
pub fn map_codec<F, U2, I2>(self, f: F) -> Framed<T, U2>
where
F: Fn(U) -> U2,
{
@ -186,10 +188,10 @@ impl<T, U> Framed<T, U> {
impl<T, U> Framed<T, U> {
/// Serialize item and Write to the inner buffer
pub fn write(mut self: Pin<&mut Self>, item: <U as Encoder>::Item) -> Result<(), <U as Encoder>::Error>
pub fn write<I>(mut self: Pin<&mut Self>, item: I) -> Result<(), <U as Encoder<I>>::Error>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let this = self.as_mut().project();
let remaining = this.write_buf.capacity() - this.write_buf.len();
@ -209,7 +211,10 @@ impl<T, U> Framed<T, U> {
}
/// Try to read underlying I/O stream and decode item.
pub fn next_item(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<U::Item, U::Error>>>
pub fn next_item(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<<U as Decoder>::Item, U::Error>>>
where
T: AsyncRead,
U: Decoder,
@ -266,10 +271,10 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer to underlying I/O stream.
pub fn flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
pub fn flush<I>(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let mut this = self.as_mut().project();
log::trace!("flushing framed transport");
@ -277,9 +282,7 @@ impl<T, U> Framed<T, U> {
while !this.write_buf.is_empty() {
log::trace!("writing; remaining={}", this.write_buf.len());
let n = ready!(
this.io.as_mut().poll_write(cx, this.write_buf)
)?;
let n = ready!(this.io.as_mut().poll_write(cx, this.write_buf))?;
if n == 0 {
return Poll::Ready(Err(io::Error::new(
@ -301,10 +304,10 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer and shutdown underlying I/O stream.
pub fn close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
pub fn close<I>(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let mut this = self.as_mut().project();
ready!(this.io.as_mut().poll_flush(cx))?;
@ -325,10 +328,10 @@ where
}
}
impl<T, U> Sink<U::Item> for Framed<T, U>
impl<T, U, I> Sink<I> for Framed<T, U>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
U::Error: From<io::Error>,
{
type Error = U::Error;
@ -341,24 +344,15 @@ where
}
}
fn start_send(
self: Pin<&mut Self>,
item: <U as Encoder>::Item,
) -> Result<(), Self::Error> {
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
self.write(item)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.flush(cx)
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.close(cx)
}
}

View File

@ -8,7 +8,7 @@
//! [`AsyncWrite`]: AsyncWrite
//! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream
#![deny(rust_2018_idioms, warnings)]
#![deny(rust_2018_idioms)]
mod bcodec;
mod framed;

View File

@ -1,33 +0,0 @@
# Changes
## [0.5.0] - 2019-12-29
* Simplify state management
* Allow to set custom output stream
* Removed disconnect callback
## [0.4.1] - 2019-12-11
* Disconnect callback accepts owned state
## [0.4.0] - 2019-12-11
* Remove `E` param
## [0.3.0-alpha.3] - 2019-12-07
* Migrate to tokio 0.2
## [0.3.0-alpha.2] - 2019-12-02
* Migrate to `std::future`
## [0.1.1] - 2019-10-14
* Re-register task on every dispatcher poll.
## [0.1.0] - 2019-09-25
* Initial release

View File

@ -1,33 +0,0 @@
[package]
name = "actix-ioframe"
version = "0.5.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix framed service"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-ioframe/"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
[lib]
name = "actix_ioframe"
path = "src/lib.rs"
[dependencies]
actix-service = "1.0.1"
actix-codec = "0.2.0"
actix-utils = "1.0.4"
actix-rt = "1.0.0"
bytes = "0.5.3"
either = "1.5.3"
futures-sink = { version = "0.3.4", default-features = false }
futures-core = { version = "0.3.4", default-features = false }
pin-project = "0.4.17"
log = "0.4"
[dev-dependencies]
actix-connect = "2.0.0-alpha.2"
actix-testing = "1.0.0"
futures-util = { version = "0.3.4", default-features = false }

View File

@ -1 +0,0 @@
../LICENSE-APACHE

View File

@ -1 +0,0 @@
../LICENSE-MIT

3
actix-ioframe/README.md Normal file
View File

@ -0,0 +1,3 @@
# actix-ioframe
**This crate has been deprecated and removed.**

View File

@ -1,123 +0,0 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_utils::mpsc::Receiver;
use futures_core::stream::Stream;
pub struct Connect<Io, Codec>
where
Codec: Encoder + Decoder,
{
io: Io,
_t: PhantomData<Codec>,
}
impl<Io, Codec> Connect<Io, Codec>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
pub(crate) fn new(io: Io) -> Self {
Self {
io,
_t: PhantomData,
}
}
pub fn codec(
self,
codec: Codec,
) -> ConnectResult<Io, (), Codec, Receiver<<Codec as Encoder>::Item>> {
ConnectResult {
state: (),
out: None,
framed: Framed::new(self.io, codec),
}
}
}
#[pin_project::pin_project]
pub struct ConnectResult<Io, St, Codec: Encoder + Decoder, Out> {
pub(crate) state: St,
pub(crate) out: Option<Out>,
#[pin]
pub(crate) framed: Framed<Io, Codec>,
}
impl<Io, St, Codec: Encoder + Decoder, Out: Unpin> ConnectResult<Io, St, Codec, Out> {
#[inline]
pub fn get_ref(&self) -> &Io {
self.framed.get_ref()
}
#[inline]
pub fn get_mut(&mut self) -> &mut Io {
self.framed.get_mut()
}
pub fn out<U>(self, out: U) -> ConnectResult<Io, St, Codec, U>
where
U: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
ConnectResult {
state: self.state,
framed: self.framed,
out: Some(out),
}
}
#[inline]
pub fn state<S>(self, state: S) -> ConnectResult<Io, S, Codec, Out> {
ConnectResult {
state,
framed: self.framed,
out: self.out,
}
}
}
impl<Io, St, Codec, Out> Stream for ConnectResult<Io, St, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().framed.next_item(cx)
}
}
impl<Io, St, Codec, Out> futures_sink::Sink<<Codec as Encoder>::Item>
for ConnectResult<Io, St, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
type Error = <Codec as Encoder>::Error;
fn poll_ready(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.as_mut().project().framed.is_write_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn start_send(
self: Pin<&mut Self>,
item: <Codec as Encoder>::Item,
) -> Result<(), Self::Error> {
self.project().framed.write(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.as_mut().project().framed.flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.as_mut().project().framed.close(cx)
}
}

View File

@ -1,248 +0,0 @@
//! Framed dispatcher service and related utilities
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::Service;
use actix_utils::mpsc;
use futures_core::stream::Stream;
use pin_project::pin_project;
use log::debug;
use crate::error::ServiceError;
type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item;
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
#[pin_project]
pub(crate) struct Dispatcher<S, T, U, Out>
where
S: Service<Request = Request<U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <U as Encoder>::Item> + Unpin,
{
service: S,
sink: Option<Out>,
state: FramedState<S, U>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<<U as Encoder>::Item, S::Error>>,
}
impl<S, T, U, Out> Dispatcher<S, T, U, Out>
where
S: Service<Request = Request<U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <U as Encoder>::Item> + Unpin,
{
pub(crate) fn new(framed: Framed<T, U>, service: S, sink: Option<Out>) -> Self {
Dispatcher {
sink,
service,
framed,
rx: mpsc::channel().1,
state: FramedState::Processing,
}
}
}
enum FramedState<S: Service, U: Encoder + Decoder> {
Processing,
Error(ServiceError<S::Error, U>),
FramedError(ServiceError<S::Error, U>),
FlushAndStop,
Stopping,
}
impl<S: Service, U: Encoder + Decoder> FramedState<S, U> {
fn take_error(&mut self) -> ServiceError<S::Error, U> {
match std::mem::replace(self, FramedState::Processing) {
FramedState::Error(err) => err,
_ => panic!(),
}
}
fn take_framed_error(&mut self) -> ServiceError<S::Error, U> {
match std::mem::replace(self, FramedState::Processing) {
FramedState::FramedError(err) => err,
_ => panic!(),
}
}
}
impl<S, T, U, Out> Dispatcher<S, T, U, Out>
where
S: Service<Request = Request<U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <U as Encoder>::Item> + Unpin,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool {
loop {
let this = self.as_mut().project();
match this.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
let item = match this.framed.next_item(cx) {
Poll::Ready(Some(Ok(el))) => el,
Poll::Ready(Some(Err(err))) => {
*this.state = FramedState::FramedError(ServiceError::Decoder(err));
return true;
}
Poll::Pending => return false,
Poll::Ready(None) => {
log::trace!("Client disconnected");
*this.state = FramedState::Stopping;
return true;
}
};
let tx = this.rx.sender();
let fut = this.service.call(item);
actix_rt::spawn(async move {
let item = fut.await;
let item = match item {
Ok(Some(item)) => Ok(item),
Ok(None) => return,
Err(err) => Err(err),
};
let _ = tx.send(item);
});
}
Poll::Pending => return false,
Poll::Ready(Err(err)) => {
*this.state = FramedState::Error(ServiceError::Service(err));
return true;
}
}
}
}
/// write to framed object
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool {
loop {
let mut this = self.as_mut().project();
while !this.framed.is_write_buf_full() {
match Pin::new(&mut this.rx).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => {
if let Err(err) = this.framed.as_mut().write(msg) {
*this.state = FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
continue;
}
Poll::Ready(Some(Err(err))) => {
*this.state = FramedState::Error(ServiceError::Service(err));
return true;
}
Poll::Ready(None) | Poll::Pending => (),
}
if this.sink.is_some() {
match Pin::new(this.sink.as_mut().unwrap()).poll_next(cx) {
Poll::Ready(Some(msg)) => {
if let Err(err) = this.framed.as_mut().write(msg) {
*this.state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
continue;
}
Poll::Ready(None) => {
let _ = this.sink.take();
*this.state = FramedState::FlushAndStop;
return true;
}
Poll::Pending => (),
}
}
break;
}
if !this.framed.is_write_buf_empty() {
match this.framed.as_mut().flush(cx) {
Poll::Pending => break,
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
*this.state = FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
}
} else {
break;
}
}
false
}
pub(crate) fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), ServiceError<S::Error, U>>> {
let mut this = self.as_mut().project();
match this.state {
FramedState::Processing => loop {
let read = self.as_mut().poll_read(cx);
let write = self.as_mut().poll_write(cx);
if read || write {
continue;
} else {
return Poll::Pending;
}
},
FramedState::Error(_) => {
// flush write buffer
if !this.framed.is_write_buf_empty() {
if let Poll::Pending = this.framed.flush(cx) {
return Poll::Pending;
}
}
Poll::Ready(Err(this.state.take_error()))
}
FramedState::FlushAndStop => {
// drain service responses
match Pin::new(this.rx).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => {
if this.framed.as_mut().write(msg).is_err() {
return Poll::Ready(Ok(()));
}
}
Poll::Ready(Some(Err(_))) => return Poll::Ready(Ok(())),
Poll::Ready(None) | Poll::Pending => (),
}
// flush io
if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) {
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
}
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(_) => (),
}
};
Poll::Ready(Ok(()))
}
FramedState::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
FramedState::Stopping => Poll::Ready(Ok(())),
}
}
}

View File

@ -1,49 +0,0 @@
use std::fmt;
use actix_codec::{Decoder, Encoder};
/// Framed service errors
pub enum ServiceError<E, U: Encoder + Decoder> {
/// Inner service error
Service(E),
/// Encoder parse error
Encoder(<U as Encoder>::Error),
/// Decoder parse error
Decoder(<U as Decoder>::Error),
}
impl<E, U: Encoder + Decoder> From<E> for ServiceError<E, U> {
fn from(err: E) -> Self {
ServiceError::Service(err)
}
}
impl<E, U: Encoder + Decoder> fmt::Debug for ServiceError<E, U>
where
E: fmt::Debug,
<U as Encoder>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
ServiceError::Service(ref e) => write!(fmt, "ServiceError::Service({:?})", e),
ServiceError::Encoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e),
ServiceError::Decoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e),
}
}
}
impl<E, U: Encoder + Decoder> fmt::Display for ServiceError<E, U>
where
E: fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
ServiceError::Service(ref e) => write!(fmt, "{}", e),
ServiceError::Encoder(ref e) => write!(fmt, "{:?}", e),
ServiceError::Decoder(ref e) => write!(fmt, "{:?}", e),
}
}
}

View File

@ -1,11 +0,0 @@
// #![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity, clippy::too_many_arguments)]
mod connect;
mod dispatcher;
mod error;
mod service;
pub use self::connect::{Connect, ConnectResult};
pub use self::error::ServiceError;
pub use self::service::{Builder, FactoryBuilder};

View File

@ -1,413 +0,0 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory};
use either::Either;
use futures_core::{ready, stream::Stream};
use crate::connect::{Connect, ConnectResult};
use crate::dispatcher::Dispatcher;
use crate::error::ServiceError;
type RequestItem<U> = <U as Decoder>::Item;
type ResponseItem<U> = Option<<U as Encoder>::Item>;
/// Service builder - structure that follows the builder pattern
/// for building instances for framed services.
pub struct Builder<St, C, Io, Codec, Out> {
connect: C,
_t: PhantomData<(St, Io, Codec, Out)>,
}
impl<St, C, Io, Codec, Out> Builder<St, C, Io, Codec, Out>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
Io: AsyncRead + AsyncWrite,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
/// Construct framed handler service with specified connect service
pub fn new<F>(connect: F) -> Builder<St, C, Io, Codec, Out>
where
F: IntoService<C>,
Io: AsyncRead + AsyncWrite,
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
Codec: Decoder + Encoder,
Out: Stream<Item = <Codec as Encoder>::Item>,
{
Builder {
connect: connect.into_service(),
_t: PhantomData,
}
}
/// Provide stream items handler service and construct service factory.
pub fn build<F, T>(self, service: F) -> FramedServiceImpl<St, C, T, Io, Codec, Out>
where
F: IntoServiceFactory<T>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
{
FramedServiceImpl {
connect: self.connect,
handler: Rc::new(service.into_factory()),
_t: PhantomData,
}
}
}
/// Service builder - structure that follows the builder pattern
/// for building instances for framed services.
pub struct FactoryBuilder<St, C, Io, Codec, Out> {
connect: C,
_t: PhantomData<(St, Io, Codec, Out)>,
}
impl<St, C, Io, Codec, Out> FactoryBuilder<St, C, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
Codec: Decoder + Encoder,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
/// Construct framed handler new service with specified connect service
pub fn new<F>(connect: F) -> FactoryBuilder<St, C, Io, Codec, Out>
where
F: IntoServiceFactory<C>,
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
Codec: Decoder + Encoder,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
FactoryBuilder {
connect: connect.into_factory(),
_t: PhantomData,
}
}
pub fn build<F, T, Cfg>(self, service: F) -> FramedService<St, C, T, Io, Codec, Out, Cfg>
where
F: IntoServiceFactory<T>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
{
FramedService {
connect: self.connect,
handler: Rc::new(service.into_factory()),
_t: PhantomData,
}
}
}
pub struct FramedService<St, C, T, Io, Codec, Out, Cfg> {
connect: C,
handler: Rc<T>,
_t: PhantomData<(St, Io, Codec, Out, Cfg)>,
}
impl<St, C, T, Io, Codec, Out, Cfg> ServiceFactory
for FramedService<St, C, T, Io, Codec, Out, Cfg>
where
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Config = Cfg;
type Request = Io;
type Response = ();
type Error = ServiceError<C::Error, Codec>;
type InitError = C::InitError;
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec, Out>;
type Future = FramedServiceResponse<St, C, T, Io, Codec, Out>;
fn new_service(&self, _: Cfg) -> Self::Future {
// create connect service and then create service impl
FramedServiceResponse {
fut: self.connect.new_service(()),
handler: self.handler.clone(),
}
}
}
#[pin_project::pin_project]
pub struct FramedServiceResponse<St, C, T, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
#[pin]
fut: C::Future,
handler: Rc<T>,
}
impl<St, C, T, Io, Codec, Out> Future for FramedServiceResponse<St, C, T, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Output = Result<FramedServiceImpl<St, C::Service, T, Io, Codec, Out>, C::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let connect = ready!(this.fut.poll(cx))?;
Poll::Ready(Ok(FramedServiceImpl {
connect,
handler: this.handler.clone(),
_t: PhantomData,
}))
}
}
pub struct FramedServiceImpl<St, C, T, Io, Codec, Out> {
connect: C,
handler: Rc<T>,
_t: PhantomData<(St, Io, Codec, Out)>,
}
impl<St, C, T, Io, Codec, Out> Service for FramedServiceImpl<St, C, T, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Request = Io;
type Response = ();
type Error = ServiceError<C::Error, Codec>;
type Future = FramedServiceImplResponse<St, Io, Codec, Out, C, T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.connect.poll_ready(cx).map_err(|e| e.into())
}
fn call(&mut self, req: Io) -> Self::Future {
FramedServiceImplResponse {
inner: FramedServiceImplResponseInner::Connect(
self.connect.call(Connect::new(req)),
self.handler.clone(),
),
}
}
}
#[pin_project::pin_project]
pub struct FramedServiceImplResponse<St, Io, Codec, Out, C, T>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
#[pin]
inner: FramedServiceImplResponseInner<St, Io, Codec, Out, C, T>,
}
impl<St, Io, Codec, Out, C, T> Future for FramedServiceImplResponse<St, Io, Codec, Out, C, T>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Output = Result<(), ServiceError<C::Error, Codec>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
loop {
match this.inner.poll(cx) {
Either::Left(new) => {
this = self.as_mut().project();
this.inner.set(new)
}
Either::Right(poll) => return poll,
};
}
}
}
#[pin_project::pin_project(project = FramedServiceImplResponseInnerProj)]
enum FramedServiceImplResponseInner<St, Io, Codec, Out, C, T>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
Connect(#[pin] C::Future, Rc<T>),
Handler(#[pin] T::Future, Option<Framed<Io, Codec>>, Option<Out>),
Dispatcher(#[pin] Dispatcher<T::Service, Io, Codec, Out>),
}
impl<St, Io, Codec, Out, C, T> FramedServiceImplResponseInner<St, Io, Codec, Out, C, T>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Either<
FramedServiceImplResponseInner<St, Io, Codec, Out, C, T>,
Poll<Result<(), ServiceError<C::Error, Codec>>>,
> {
match self.project() {
FramedServiceImplResponseInnerProj::Connect(fut, handler) => match fut.poll(cx) {
Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler(
handler.new_service(res.state),
Some(res.framed),
res.out,
)),
Poll::Pending => Either::Right(Poll::Pending),
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
},
FramedServiceImplResponseInnerProj::Handler(fut, framed, out) => {
match fut.poll(cx) {
Poll::Ready(Ok(handler)) => {
Either::Left(FramedServiceImplResponseInner::Dispatcher(
Dispatcher::new(framed.take().unwrap(), handler, out.take()),
))
}
Poll::Pending => Either::Right(Poll::Pending),
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
}
}
FramedServiceImplResponseInnerProj::Dispatcher(fut) => {
Either::Right(fut.poll(cx))
}
}
}
}

View File

@ -1,55 +0,0 @@
use std::cell::Cell;
use std::rc::Rc;
use actix_codec::BytesCodec;
use actix_service::{fn_factory_with_config, fn_service, IntoService, Service};
use actix_testing::TestServer;
use actix_utils::mpsc;
use bytes::{Bytes, BytesMut};
use futures_util::future::ok;
use actix_ioframe::{Builder, Connect, FactoryBuilder};
#[derive(Clone)]
struct State(Option<mpsc::Sender<Bytes>>);
#[actix_rt::test]
async fn test_basic() {
let client_item = Rc::new(Cell::new(false));
let srv = TestServer::with(move || {
FactoryBuilder::new(fn_service(|conn: Connect<_, _>| {
ok(conn.codec(BytesCodec).state(State(None)))
}))
// echo
.build(fn_service(|t: BytesMut| ok(Some(t.freeze()))))
});
let item = client_item.clone();
let mut client = Builder::new(fn_service(move |conn: Connect<_, _>| {
async move {
let (tx, rx) = mpsc::channel();
let _ = tx.send(Bytes::from_static(b"Hello"));
Ok(conn.codec(BytesCodec).out(rx).state(State(Some(tx))))
}
}))
.build(fn_factory_with_config(move |mut cfg: State| {
let item = item.clone();
ok((move |t: BytesMut| {
assert_eq!(t.freeze(), Bytes::from_static(b"Hello"));
item.set(true);
// drop Sender, which will close connection
cfg.0.take();
ok::<_, ()>(None)
})
.into_service())
}));
let conn = actix_connect::default_connector()
.call(actix_connect::Connect::with(String::new(), srv.addr()))
.await
.unwrap();
client.call(conn.into_parts().0).await.unwrap();
assert!(client_item.get());
}

View File

@ -1,5 +1,8 @@
# Changes
## Unreleased - 2020-xx-xx
* Upgrade `tokio-util` to `0.3`.
## [1.0.6] - 2020-01-08
* Add `Clone` impl for `condition::Waiter`

View File

@ -6,31 +6,28 @@ use std::{fmt, mem};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures_util::{future::Future, FutureExt, stream::Stream};
use futures_util::{future::Future, stream::Stream, FutureExt};
use log::debug;
use crate::mpsc;
type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item;
/// Framed transport errors
pub enum DispatcherError<E, U: Encoder + Decoder> {
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
Service(E),
Encoder(<U as Encoder>::Error),
Encoder(<U as Encoder<I>>::Error),
Decoder(<U as Decoder>::Error),
}
impl<E, U: Encoder + Decoder> From<E> for DispatcherError<E, U> {
impl<E, U: Encoder<I> + Decoder, I> From<E> for DispatcherError<E, U, I> {
fn from(err: E) -> Self {
DispatcherError::Service(err)
}
}
impl<E, U: Encoder + Decoder> fmt::Debug for DispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> fmt::Debug for DispatcherError<E, U, I>
where
E: fmt::Debug,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -42,10 +39,10 @@ where
}
}
impl<E, U: Encoder + Decoder> fmt::Display for DispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> fmt::Display for DispatcherError<E, U, I>
where
E: fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -62,44 +59,44 @@ pub enum Message<T> {
Close,
}
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
/// Dispatcher is a future that reads frames from Framed object
/// and passes them to the service.
#[pin_project::pin_project]
pub struct Dispatcher<S, T, U>
pub struct Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Encoder<I> + Decoder,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
service: S,
state: State<S, U>,
state: State<S, U, I>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<<U as Encoder>::Item>, S::Error>>,
tx: mpsc::Sender<Result<Message<<U as Encoder>::Item>, S::Error>>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
}
enum State<S: Service, U: Encoder + Decoder> {
enum State<S: Service, U: Encoder<I> + Decoder, I> {
Processing,
Error(DispatcherError<S::Error, U>),
FramedError(DispatcherError<S::Error, U>),
Error(DispatcherError<S::Error, U, I>),
FramedError(DispatcherError<S::Error, U, I>),
FlushAndStop,
Stopping,
}
impl<S: Service, U: Encoder + Decoder> State<S, U> {
fn take_error(&mut self) -> DispatcherError<S::Error, U> {
impl<S: Service, U: Encoder<I> + Decoder, I> State<S, U, I> {
fn take_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::Error(err) => err,
_ => panic!(),
}
}
fn take_framed_error(&mut self) -> DispatcherError<S::Error, U> {
fn take_framed_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::FramedError(err) => err,
_ => panic!(),
@ -107,15 +104,16 @@ impl<S: Service, U: Encoder + Decoder> State<S, U> {
}
}
impl<S, T, U> Dispatcher<S, T, U>
impl<S, T, U, I> Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Decoder>::Error: std::fmt::Debug,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
let (tx, rx) = mpsc::channel();
@ -132,7 +130,7 @@ where
pub fn with_rx<F: IntoService<S>>(
framed: Framed<T, U>,
service: F,
rx: mpsc::Receiver<Result<Message<<U as Encoder>::Item>, S::Error>>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
) -> Self {
let tx = rx.sender();
Dispatcher {
@ -145,7 +143,7 @@ where
}
/// Get sink
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<<U as Encoder>::Item>, S::Error>> {
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
self.tx.clone()
}
@ -172,13 +170,13 @@ where
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
loop {
let this = self.as_mut().project();
@ -214,13 +212,13 @@ where
/// write to framed object
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
loop {
let mut this = self.as_mut().project();
@ -263,18 +261,18 @@ where
}
}
impl<S, T, U> Future for Dispatcher<S, T, U>
impl<S, T, U, I> Future for Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
<U as Decoder>::Error: std::fmt::Debug,
{
type Output = Result<(), DispatcherError<S::Error, U>>;
type Output = Result<(), DispatcherError<S::Error, U, I>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {

View File

@ -1,5 +1,5 @@
//! Actix utils - various helper services
#![deny(rust_2018_idioms, warnings)]
#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]
mod cell;