1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-12 15:37:05 +02:00

Compare commits

..

13 Commits

Author SHA1 Message Date
Nikolay Kim
c094f84b85 prepare actix-service release 2019-12-11 10:29:34 +06:00
Nikolay Kim
25012d290a update actix-codec dependencies 2019-12-11 10:23:01 +06:00
Nikolay Kim
32202188cc prepare actix-codec release 2019-12-11 10:18:11 +06:00
Nikolay Kim
bf734a31dc update docs 2019-12-10 21:34:51 +06:00
Nikolay Kim
d29e7c4ba6 Merge branch 'master' of github.com:actix/actix-net 2019-12-10 21:14:18 +06:00
Nikolay Kim
7163e2c2a2 update doc strings 2019-12-10 21:14:06 +06:00
Nikolay Kim
1d810b4561 re-export AlpnError 2019-12-10 12:15:27 +06:00
daxpedda
0913badd61 Macro improvements. (#74)
* Macro improvements.

* Fix usage in `fn main`.
2019-12-10 08:47:35 +06:00
Nikolay Kim
8b3062cd6e Fix buffer remaining capacity calcualtion 2019-12-09 21:50:36 +06:00
Nikolay Kim
35218a4df1 add Clone impl for Apply service 2019-12-09 14:07:20 +06:00
Nikolay Kim
d47f1fb730 prepare actix-service release 2019-12-08 19:49:35 +06:00
Nikolay Kim
1ad0bbfb7f rename fn service helpers 2019-12-08 19:05:05 +06:00
Nikolay Kim
c38a25f102 fix hash impl 2019-12-07 11:51:47 +06:00
36 changed files with 414 additions and 135 deletions

View File

@@ -30,14 +30,16 @@ fn main() -> io::Result<()> {
let num = num.clone();
let acceptor = acceptor.clone();
// service for converting incoming TcpStream to a SslStream<TcpStream>
fn_service(move |stream: Io<tokio_tcp::TcpStream>| {
SslAcceptorExt::accept_async(&acceptor, stream.into_parts().0)
.map_err(|e| println!("Openssl error: {}", e))
})
// .and_then() combinator uses other service to convert incoming `Request` to a
// `Response` and then uses that response as an input for next
// service. in this case, on success we use `logger` service
// construct transformation pipeline
pipeline(
// service for converting incoming TcpStream to a SslStream<TcpStream>
fn_service(move |stream: actix_rt::net::TcpStream| async move {
SslAcceptorExt::accept_async(&acceptor, stream.into_parts().0).await
.map_err(|e| println!("Openssl error: {}", e))
}))
// .and_then() combinator chains result of previos service call to argument
/// for next service calll. in this case, on success we chain
/// ssl stream to the `logger` service.
.and_then(fn_service(logger))
// Next service counts number of connections
.and_then(move |_| {

View File

@@ -1,5 +1,13 @@
# Changes
## [0.2.0] - 2019-12-10
* Use specific futures dependencies
## [0.2.0-alpha.4]
* Fix buffer remaining capacity calcualtion
## [0.2.0-alpha.3]
* Use tokio 0.2
@@ -10,17 +18,14 @@
* Migrated to `std::future`
## [0.1.2] - 2019-03-27
* Added `Framed::map_io()` method.
## [0.1.1] - 2019-03-06
* Added `FramedParts::with_read_buffer()` method.
## [0.1.0] - 2018-12-09
* Move codec to separate crate

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-codec"
version = "0.2.0-alpha.3"
version = "0.2.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@@ -9,7 +9,6 @@ repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-codec/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = ".."
@@ -20,7 +19,8 @@ path = "src/lib.rs"
[dependencies]
bitflags = "1.2.1"
bytes = "0.5.2"
futures = "0.3.1"
futures-core = "0.3.1"
futures-sink = "0.3.1"
tokio = { version = "0.2.4", default-features=false }
tokio-util = { version = "0.2.0", default-features=false, features=["codec"] }
log = "0.4"

View File

@@ -2,8 +2,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io};
use bytes::{BufMut, BytesMut};
use futures::{ready, Sink, Stream};
use bytes::BytesMut;
use futures_core::{ready, Stream};
use futures_sink::Sink;
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};
@@ -19,8 +20,6 @@ bitflags::bitflags! {
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
///
/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
pub struct Framed<T, U> {
io: T,
codec: U,
@@ -49,10 +48,6 @@ where
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
pub fn new(io: T, codec: U) -> Framed<T, U> {
Framed {
io,
@@ -82,10 +77,6 @@ impl<T, U> Framed<T, U> {
/// This objects takes a stream and a readbuffer and a writebuffer. These
/// field can be obtained from an existing `Framed` with the
/// `into_parts` method.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
io: parts.io,
@@ -136,15 +127,6 @@ impl<T, U> Framed<T, U> {
self.write_buf.len() >= HW
}
/// Consumes the `Frame`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
self.io
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2>(self, codec: U2) -> Framed<T, U2> {
Framed {
@@ -208,7 +190,7 @@ impl<T, U> Framed<T, U> {
T: AsyncWrite,
U: Encoder,
{
let remaining = self.write_buf.remaining_mut();
let remaining = self.write_buf.capacity() - self.write_buf.len();
if remaining < LW {
self.write_buf.reserve(HW - remaining);
}
@@ -217,11 +199,14 @@ impl<T, U> Framed<T, U> {
Ok(())
}
/// Check if framed is able to write more data
pub fn is_ready(&self) -> bool {
/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}
/// Try to read underlying I/O stream and decode item.
pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<U::Item, U::Error>>>
where
T: AsyncRead,
@@ -251,9 +236,7 @@ impl<T, U> Framed<T, U> {
return Poll::Ready(Some(Ok(frame)));
}
Err(e) => return Poll::Ready(Some(Err(e))),
_ => {
// Need more data
}
_ => (), // Need more data
}
self.flags.remove(Flags::READABLE);
@@ -262,7 +245,7 @@ impl<T, U> Framed<T, U> {
debug_assert!(!self.flags.contains(Flags::EOF));
// Otherwise, try to read more data and try again. Make sure we've got room
let remaining = self.read_buf.remaining_mut();
let remaining = self.read_buf.capacity() - self.read_buf.len();
if remaining < LW {
self.read_buf.reserve(HW - remaining)
}
@@ -281,6 +264,7 @@ impl<T, U> Framed<T, U> {
}
}
/// Flush write buffer to underlying I/O stream.
pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
@@ -298,14 +282,12 @@ impl<T, U> Framed<T, U> {
if n == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to \
write frame to transport",
"failed to write frame to transport",
)
.into()));
}
// TODO: Add a way to `bytes` to do this w/o returning the drained
// data.
// remove written data
let _ = self.write_buf.split_to(n);
}
@@ -316,6 +298,7 @@ impl<T, U> Framed<T, U> {
Poll::Ready(Ok(()))
}
/// Flush write buffer and shutdown underlying I/O stream.
pub fn close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
@@ -350,7 +333,7 @@ where
type Error = U::Error;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.is_ready() {
if self.is_write_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending

View File

@@ -7,7 +7,6 @@
//! [`AsyncRead`]: #
//! [`AsyncWrite`]: #
#![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity)]
mod bcodec;
mod framed;

View File

@@ -34,7 +34,7 @@ uri = ["http"]
[dependencies]
actix-service = "1.0.0-alpha.3"
actix-codec = "0.2.0-alpha.3"
actix-codec = "0.2.0"
actix-utils = "1.0.0-alpha.3"
actix-rt = "1.0.0-alpha.3"
derive_more = "0.99.2"

View File

@@ -2,7 +2,7 @@ use std::io;
use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream;
use actix_service::{service_fn, Service, ServiceFactory};
use actix_service::{fn_service, Service, ServiceFactory};
use actix_testing::TestServer;
use bytes::Bytes;
use futures::SinkExt;
@@ -14,7 +14,7 @@ use actix_connect::Connect;
#[actix_rt::test]
async fn test_string() {
let srv = TestServer::with(|| {
service_fn(|io: TcpStream| {
fn_service(|io: TcpStream| {
async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@@ -33,7 +33,7 @@ async fn test_string() {
#[actix_rt::test]
async fn test_rustls_string() {
let srv = TestServer::with(|| {
service_fn(|io: TcpStream| {
fn_service(|io: TcpStream| {
async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@@ -51,7 +51,7 @@ async fn test_rustls_string() {
#[actix_rt::test]
async fn test_static_str() {
let srv = TestServer::with(|| {
service_fn(|io: TcpStream| {
fn_service(|io: TcpStream| {
async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@@ -75,7 +75,7 @@ async fn test_static_str() {
#[actix_rt::test]
async fn test_new_service() {
let srv = TestServer::with(|| {
service_fn(|io: TcpStream| {
fn_service(|io: TcpStream| {
async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@@ -100,7 +100,7 @@ async fn test_uri() {
use std::convert::TryFrom;
let srv = TestServer::with(|| {
service_fn(|io: TcpStream| {
fn_service(|io: TcpStream| {
async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@@ -121,7 +121,7 @@ async fn test_rustls_uri() {
use std::convert::TryFrom;
let srv = TestServer::with(|| {
service_fn(|io: TcpStream| {
fn_service(|io: TcpStream| {
async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;

View File

@@ -19,7 +19,7 @@ path = "src/lib.rs"
[dependencies]
actix-service = "1.0.0-alpha.3"
actix-codec = "0.2.0-alpha.2"
actix-codec = "0.2.0"
actix-utils = "1.0.0-alpha.2"
actix-rt = "1.0.0-alpha.2"
bytes = "0.5"

View File

@@ -96,7 +96,7 @@ where
type Error = <Codec as Encoder>::Error;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.framed.is_ready() {
if self.framed.is_write_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending

View File

@@ -4,7 +4,7 @@ use std::time::Duration;
use actix_codec::BytesCodec;
use actix_rt::time::delay_for;
use actix_service::{service_fn, Service};
use actix_service::{fn_service, Service};
use actix_testing::TestServer;
use futures::future::ok;
@@ -22,13 +22,13 @@ async fn test_disconnect() -> std::io::Result<()> {
let disconnect1 = disconnect1.clone();
Builder::new()
.factory(service_fn(|conn: Connect<_>| {
.factory(fn_service(|conn: Connect<_>| {
ok(conn.codec(BytesCodec).state(State))
}))
.disconnect(move |_, _| {
disconnect1.store(true, Ordering::Relaxed);
})
.finish(service_fn(|_t| ok(None)))
.finish(fn_service(|_t| ok(None)))
});
let mut client = Builder::new()
@@ -37,7 +37,7 @@ async fn test_disconnect() -> std::io::Result<()> {
conn.sink().close();
ok(conn)
})
.finish(service_fn(|_t| ok(None)));
.finish(fn_service(|_t| ok(None)));
let conn = actix_connect::default_connector()
.call(actix_connect::Connect::with(String::new(), srv.addr()))

View File

@@ -17,25 +17,26 @@ use quote::quote;
#[proc_macro_attribute]
#[cfg(not(test))] // Work around for rust-lang/rust#62127
pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
let input = syn::parse_macro_input!(item as syn::ItemFn);
let ret = &input.sig.output;
let name = &input.sig.ident;
let inputs = &input.sig.inputs;
let body = &input.block;
let mut input = syn::parse_macro_input!(item as syn::ItemFn);
let attrs = &input.attrs;
let vis = &input.vis;
let sig = &mut input.sig;
let body = &input.block;
let name = &sig.ident;
if input.sig.asyncness.is_none() {
return syn::Error::new_spanned(input.sig.fn_token, "only async fn is supported")
if sig.asyncness.is_none() {
return syn::Error::new_spanned(sig.fn_token, "only async fn is supported")
.to_compile_error()
.into();
}
sig.asyncness = None;
(quote! {
#(#attrs)*
fn #name(#inputs) #ret {
actix_rt::System::new("main")
.block_on(async { #body })
#vis #sig {
actix_rt::System::new(stringify!(#name))
.block_on(async move { #body })
}
})
.into()

View File

@@ -1,5 +1,11 @@
# Changes
## [1.0.0-alpha.4] - 2019-12-08
### Changed
* Use actix-service 1.0.0-alpha.4
## [1.0.0-alpha.3] - 2019-12-07
### Changed

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "1.0.0-alpha.3"
version = "1.0.0-alpha.4"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"]
@@ -21,9 +21,9 @@ path = "src/lib.rs"
default = []
[dependencies]
actix-service = "1.0.0-alpha.3"
actix-service = "1.0.0-alpha.4"
actix-rt = "1.0.0-alpha.3"
actix-codec = "0.2.0-alpha.3"
actix-codec = "0.2.0"
actix-utils = "1.0.0-alpha.3"
log = "0.4"

View File

@@ -142,7 +142,7 @@ impl InternalServiceFactory for ConfiguredService {
let name = names.remove(&token).unwrap().0;
res.push((
token,
Box::new(StreamService::new(actix::service_fn2(
Box::new(StreamService::new(actix::fn_service(
move |_: TcpStream| {
error!("Service {:?} is not configured", name);
ok::<_, ()>(())

View File

@@ -6,7 +6,7 @@ use std::{net, thread, time};
use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::service_fn;
use actix_service::fn_service;
use bytes::Bytes;
use futures::future::{lazy, ok};
use futures::SinkExt;
@@ -31,7 +31,7 @@ fn test_bind() {
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || service_fn(|_| ok::<_, ()>(())))
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.start();
let _ = tx.send((srv, actix_rt::System::current()));
@@ -56,7 +56,7 @@ fn test_listen() {
Server::build()
.disable_signals()
.workers(1)
.listen("test", lst, move || service_fn(|_| ok::<_, ()>(())))
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.start();
let _ = tx.send(actix_rt::System::current());
@@ -82,7 +82,7 @@ fn test_start() {
.backlog(100)
.disable_signals()
.bind("test", addr, move || {
service_fn(|io: TcpStream| {
fn_service(|io: TcpStream| {
async move {
let mut f = Framed::new(io, BytesCodec);
f.send(Bytes::from_static(b"test")).await.unwrap();
@@ -158,8 +158,8 @@ fn test_configure() {
.listen("addr3", lst)
.apply(move |rt| {
let num = num.clone();
rt.service("addr1", service_fn(|_| ok::<_, ()>(())));
rt.service("addr3", service_fn(|_| ok::<_, ()>(())));
rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
rt.on_start(lazy(move |_| {
let _ = num.fetch_add(1, Relaxed);
}))

View File

@@ -1,5 +1,23 @@
# Changes
## [1.0.0] - 2019-12-11
### Added
* Add Clone impl for Apply service
## [1.0.0-alpha.4] - 2019-12-08
### Changed
* Renamed `service_fn` to `fn_service`
* Renamed `factory_fn` to `fn_factory`
* Renamed `factory_fn_cfg` to `fn_factory_with_config`
## [1.0.0-alpha.3] - 2019-12-06
### Changed

View File

@@ -1,15 +1,14 @@
[package]
name = "actix-service"
version = "1.0.0-alpha.3"
version = "1.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service"
description = "Actix service"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-service/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = ".."
@@ -27,4 +26,4 @@ futures-util = "0.3.1"
pin-project = "0.4.6"
[dev-dependencies]
actix-rt = "1.0.0-alpha.2"
actix-rt = "1.0.0-alpha.3"

View File

@@ -258,7 +258,7 @@ mod tests {
use futures_util::future::{lazy, ok, ready, Ready};
use crate::{factory_fn, pipeline, pipeline_factory, Service, ServiceFactory};
use crate::{fn_factory, pipeline, pipeline_factory, Service, ServiceFactory};
struct Srv1(Rc<Cell<usize>>);
@@ -320,7 +320,7 @@ mod tests {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let new_srv =
pipeline_factory(factory_fn(move || ready(Ok::<_, ()>(Srv1(cnt2.clone())))))
pipeline_factory(fn_factory(move || ready(Ok::<_, ()>(Srv1(cnt2.clone())))))
.and_then(move || ready(Ok(Srv2(cnt.clone()))));
let mut srv = new_srv.new_service(()).await.unwrap();

View File

@@ -283,7 +283,7 @@ mod tests {
use futures_util::future::{lazy, ok, Ready, TryFutureExt};
use crate::{pipeline, pipeline_factory, service_fn2, Service, ServiceFactory};
use crate::{fn_service, pipeline, pipeline_factory, Service, ServiceFactory};
#[derive(Clone)]
struct Srv;
@@ -318,7 +318,7 @@ mod tests {
#[actix_rt::test]
async fn test_service_factory() {
let new_srv = pipeline_factory(|| ok::<_, ()>(service_fn2(|r: &'static str| ok(r))))
let new_srv = pipeline_factory(|| ok::<_, ()>(fn_service(|r: &'static str| ok(r))))
.and_then_apply_fn(
|| ok(Srv),
|req: &'static str, s| s.call(()).map_ok(move |res| (req, res)),

View File

@@ -5,7 +5,7 @@ use std::task::{Context, Poll};
use super::{IntoService, IntoServiceFactory, Service, ServiceFactory};
/// Apply tranform function to a service
/// Apply tranform function to a service.
pub fn apply_fn<T, F, R, In, Out, Err, U>(service: U, f: F) -> Apply<T, F, R, In, Out, Err>
where
T: Service<Error = Err>,
@@ -16,7 +16,7 @@ where
Apply::new(service.into_service(), f)
}
/// Create factory for `apply` service.
/// Service factory that prodices `apply_fn` service.
pub fn apply_fn_factory<T, F, R, In, Out, Err, U>(
service: U,
f: F,
@@ -56,6 +56,21 @@ where
}
}
impl<T, F, R, In, Out, Err> Clone for Apply<T, F, R, In, Out, Err>
where
T: Service<Error = Err> + Clone,
F: FnMut(In, &mut T) -> R + Clone,
R: Future<Output = Result<Out, Err>>,
{
fn clone(&self) -> Self {
Apply {
service: self.service.clone(),
f: self.f.clone(),
r: PhantomData,
}
}
}
impl<T, F, R, In, Out, Err> Service for Apply<T, F, R, In, Out, Err>
where
T: Service<Error = Err>,

View File

@@ -6,7 +6,7 @@ use std::task::{Context, Poll};
use crate::cell::Cell;
use crate::{Service, ServiceFactory};
/// Convert `Fn(&Config, &mut Service) -> Future<Service>` fn to a NewService
/// Convert `Fn(Config, &mut Service1) -> Future<Service2>` fn to a service factory
pub fn apply_cfg<F, C, T, R, S, E>(srv: T, f: F) -> ApplyConfigService<F, C, T, R, S, E>
where
F: FnMut(C, &mut T) -> R,
@@ -20,10 +20,11 @@ where
}
}
/// Convert `Fn(&Config, &mut Service) -> Future<Service>` fn to a NewService
/// Service get constructor from NewService.
/// Convert `Fn(Config, &mut Service1) -> Future<Service2>` fn to a service factory
///
/// Service1 get constructed from `T` factory.
pub fn apply_cfg_factory<F, C, T, R, S>(
srv: T,
factory: T,
f: F,
) -> ApplyConfigServiceFactory<F, C, T, R, S>
where
@@ -34,12 +35,12 @@ where
S: Service,
{
ApplyConfigServiceFactory {
srv: Cell::new((srv, f)),
srv: Cell::new((factory, f)),
_t: PhantomData,
}
}
/// Convert `Fn(&Config) -> Future<Service>` fn to NewService\
/// Convert `Fn(Config, &mut Server) -> Future<Service>` fn to NewService\
pub struct ApplyConfigService<F, C, T, R, S, E>
where
F: FnMut(C, &mut T) -> R,

View File

@@ -13,7 +13,7 @@ pub type BoxService<Req, Res, Err> =
pub struct BoxServiceFactory<C, Req, Res, Err, InitErr>(Inner<C, Req, Res, Err, InitErr>);
/// Create boxed new service
/// Create boxed service factory
pub fn factory<T>(
factory: T,
) -> BoxServiceFactory<T::Config, T::Request, T::Response, T::Error, T::InitError>

View File

@@ -7,7 +7,7 @@ use futures_util::future::{ok, Ready};
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
/// Create `ServiceFactory` for function that can act as a `Service`
pub fn service_fn<F, Fut, Req, Res, Err, Cfg>(
pub fn fn_service<F, Fut, Req, Res, Err, Cfg>(
f: F,
) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
where
@@ -17,16 +17,43 @@ where
FnServiceFactory::new(f)
}
pub fn service_fn2<F, Fut, Req, Res, Err>(f: F) -> FnService<F, Fut, Req, Res, Err>
where
F: FnMut(Req) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
FnService::new(f)
}
/// Create `ServiceFactory` for function that can produce services
pub fn factory_fn<F, Cfg, Srv, Fut, Err>(f: F) -> FnServiceNoConfig<F, Cfg, Srv, Fut, Err>
///
/// # Example
///
/// ```rust
/// use std::io;
/// use actix_service::{fn_factory, fn_service, Service, ServiceFactory};
/// use futures_util::future::ok;
///
/// /// Service that divides two usize values.
/// async fn div((x, y): (usize, usize)) -> Result<usize, io::Error> {
/// if y == 0 {
/// Err(io::Error::new(io::ErrorKind::Other, "divide by zdro"))
/// } else {
/// Ok(x / y)
/// }
/// }
///
/// #[actix_rt::main]
/// async fn main() -> io::Result<()> {
/// // Create service factory that produces `div` services
/// let factory = fn_factory(|| {
/// ok::<_, io::Error>(fn_service(div))
/// });
///
/// // construct new service
/// let mut srv = factory.new_service(()).await?;
///
/// // now we can use `div` service
/// let result = srv.call((10, 20)).await?;
///
/// println!("10 / 20 = {}", result);
///
/// Ok(())
/// }
/// ```
pub fn fn_factory<F, Cfg, Srv, Fut, Err>(f: F) -> FnServiceNoConfig<F, Cfg, Srv, Fut, Err>
where
Srv: Service,
F: Fn() -> Fut,
@@ -35,8 +62,39 @@ where
FnServiceNoConfig::new(f)
}
/// Create `ServiceFactory` for function that can produce services with configuration
pub fn factory_fn_cfg<F, Fut, Cfg, Srv, Err>(f: F) -> FnServiceConfig<F, Fut, Cfg, Srv, Err>
/// Create `ServiceFactory` for function that accepts config argument and can produce services
///
/// Any function that has following form `Fn(Config) -> Future<Output = Service>` could
/// act as a `ServiceFactory`.
///
/// # Example
///
/// ```rust
/// use std::io;
/// use actix_service::{fn_factory_with_config, fn_service, Service, ServiceFactory};
/// use futures_util::future::ok;
///
/// #[actix_rt::main]
/// async fn main() -> io::Result<()> {
/// // Create service factory. factory uses config argument for
/// // services it generates.
/// let factory = fn_factory_with_config(|y: usize| {
/// ok::<_, io::Error>(fn_service(move |x: usize| ok::<_, io::Error>(x * y)))
/// });
///
/// // construct new service with config argument
/// let mut srv = factory.new_service(10).await?;
///
/// let result = srv.call(10).await?;
/// assert_eq!(result, 100);
///
/// println!("10 * 10 = {}", result);
/// Ok(())
/// }
/// ```
pub fn fn_factory_with_config<F, Fut, Cfg, Srv, Err>(
f: F,
) -> FnServiceConfig<F, Fut, Cfg, Srv, Err>
where
F: Fn(Cfg) -> Fut,
Fut: Future<Output = Result<Srv, Err>>,
@@ -132,6 +190,25 @@ where
}
}
impl<F, Fut, Req, Res, Err> Service for FnServiceFactory<F, Fut, Req, Res, Err, ()>
where
F: FnMut(Req) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
{
type Request = Req;
type Response = Res;
type Error = Err;
type Future = Fut;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
(self.f)(req)
}
}
impl<F, Fut, Req, Res, Err, Cfg> ServiceFactory for FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
where
F: FnMut(Req) -> Fut + Clone,
@@ -280,3 +357,47 @@ where
FnServiceNoConfig::new(self)
}
}
#[cfg(test)]
mod tests {
use std::task::Poll;
use futures_util::future::{lazy, ok};
use super::*;
use crate::{Service, ServiceFactory};
#[actix_rt::test]
async fn test_fn_service() {
let new_srv = fn_service(|()| ok::<_, ()>("srv"));
let mut srv = new_srv.new_service(()).await.unwrap();
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());
assert_eq!(res.unwrap(), "srv");
}
#[actix_rt::test]
async fn test_fn_service_service() {
let mut srv = fn_service(|()| ok::<_, ()>("srv"));
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());
assert_eq!(res.unwrap(), "srv");
}
#[actix_rt::test]
async fn test_fn_service_with_config() {
let new_srv = fn_factory_with_config(|cfg: usize| {
ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg))))
});
let mut srv = new_srv.new_service(1).await.unwrap();
let res = srv.call(()).await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv", 1));
}
}

View File

@@ -25,12 +25,48 @@ mod transform_err;
pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::apply_cfg::{apply_cfg, apply_cfg_factory};
pub use self::fn_service::{factory_fn, factory_fn_cfg, service_fn, service_fn2};
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::map_config::{map_config, unit_config};
pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
pub use self::transform::{apply, Transform};
/// An asynchronous function from `Request` to a `Response`.
///
/// `Service` represents a service that represanting interation, taking requests and giving back
/// replies. You can think about service as a function with one argument and result as a return
/// type. In general form it looks like `async fn(Req) -> Result<Res, Err>`. `Service`
/// trait just generalizing form of this function. Each parameter described as an assotiated type.
///
/// Services provides a symmetric and uniform API, same abstractions represents
/// clients and servers. Services describe only `transforamtion` operation
/// which encorouge to simplify api surface and phrases `value transformation`.
/// That leads to simplier design of each service. That also allows better testability
/// and better composition.
///
/// Services could be represented in several different forms. In general,
/// Service is a type that implements `Service` trait.
///
/// ```rust,ignore
/// struct MyService;
///
/// impl Service for MyService {
/// type Request = u8;
/// type Response = u64;
/// type Error = MyError;
/// type Future = Pin<Box<Future<Output=Result<Self::Response, Self::Error>>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ... }
///
/// fn call(&mut self) -> Self::Future { ... }
/// }
/// ```
///
/// Service can have mutable state that influence computation.
/// This service could be rewritten as a simple function:
///
/// ```rust,ignore
/// async fn my_service(req: u8) -> Result<u64, MyError>;
/// ```
pub trait Service {
/// Requests handled by the service.
type Request;
@@ -53,6 +89,12 @@ pub trait Service {
/// This is a **best effort** implementation. False positives are permitted.
/// It is permitted for the service to return `Ready` from a `poll_ready`
/// call and the next invocation of `call` results in an error.
///
/// There are several notes to consider:
///
/// 1. `.poll_ready()` might be called on different task from actual service call.
///
/// 2. In case of chained services, `.poll_ready()` get called for all services at once.
fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Process the request and return the response asynchronously.
@@ -287,7 +329,7 @@ pub trait IntoServiceFactory<T>
where
T: ServiceFactory,
{
/// Convert `Self` an `ServiceFactory`
/// Convert `Self` to a `ServiceFactory`
fn into_factory(self) -> T;
}

View File

@@ -2,7 +2,10 @@ use std::marker::PhantomData;
use super::ServiceFactory;
/// Adapt external config to a config for provided new service
/// Adapt external config argument to a config for provided service factory
///
/// Note that this function consumes the receiving service factory and returns
/// a wrapped version of it.
pub fn map_config<T, F, C>(factory: T, f: F) -> MapConfig<T, F, C>
where
T: ServiceFactory,

View File

@@ -9,6 +9,7 @@ use crate::map_init_err::MapInitErr;
use crate::then::{ThenService, ThenServiceFactory};
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
/// Contruct new pipeline with one service in pipeline chain.
pub fn pipeline<F, T>(service: F) -> Pipeline<T>
where
F: IntoService<T>,
@@ -19,6 +20,7 @@ where
}
}
/// Contruct new pipeline factory with one service factory.
pub fn pipeline_factory<T, F>(factory: F) -> PipelineFactory<T>
where
T: ServiceFactory,
@@ -29,7 +31,7 @@ where
}
}
/// Pipeline service
/// Pipeline service - pipeline allows to compose multiple service into one service.
pub struct Pipeline<T> {
service: T,
}
@@ -159,7 +161,7 @@ impl<T: Service> Service for Pipeline<T> {
}
}
/// Pipeline constructor
/// Pipeline factory
pub struct PipelineFactory<T> {
factory: T,
}

View File

@@ -7,23 +7,88 @@ use std::task::{Context, Poll};
use crate::transform_err::TransformMapInitErr;
use crate::{IntoServiceFactory, Service, ServiceFactory};
/// Apply transform to a service. Function returns
/// services factory that in initialization creates
/// service and applies transform to this service.
pub fn apply<T, S, U>(t: T, service: U) -> ApplyTransform<T, S>
/// Apply transform to a service.
pub fn apply<T, S, U>(t: T, factory: U) -> ApplyTransform<T, S>
where
S: ServiceFactory,
T: Transform<S::Service, InitError = S::InitError>,
U: IntoServiceFactory<S>,
{
ApplyTransform::new(t, service.into_factory())
ApplyTransform::new(t, factory.into_factory())
}
/// The `Transform` trait defines the interface of a service factory that wraps inner service
/// during construction.
///
/// Transform(middleware) wraps inner service and runs during
/// inbound and/or outbound processing in the request/response lifecycle.
/// It may modify request and/or response.
///
/// For example, timeout transform:
///
/// ```rust,ignore
/// pub struct Timeout<S> {
/// service: S,
/// timeout: Duration,
/// }
///
/// impl<S> Service for Timeout<S>
/// where
/// S: Service,
/// {
/// type Request = S::Request;
/// type Response = S::Response;
/// type Error = TimeoutError<S::Error>;
/// type Future = TimeoutServiceResponse<S>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// ready!(self.service.poll_ready(cx)).map_err(TimeoutError::Service)
/// }
///
/// fn call(&mut self, req: S::Request) -> Self::Future {
/// TimeoutServiceResponse {
/// fut: self.service.call(req),
/// sleep: Delay::new(clock::now() + self.timeout),
/// }
/// }
/// }
/// ```
///
/// Timeout service in above example is decoupled from underlying service implementation
/// and could be applied to any service.
///
/// The `Transform` trait defines the interface of a Service factory. `Transform`
/// is often implemented for middleware, defining how to construct a
/// middleware Service. A Service that is constructed by the factory takes
/// the Service that follows it during execution as a parameter, assuming
/// ownership of the next Service.
///
/// Factory for `Timeout` middleware from the above example could look like this:
///
/// ```rust,,ignore
/// pub struct TimeoutTransform {
/// timeout: Duration,
/// }
///
/// impl<S> Transform<S> for TimeoutTransform<E>
/// where
/// S: Service,
/// {
/// type Request = S::Request;
/// type Response = S::Response;
/// type Error = TimeoutError<S::Error>;
/// type InitError = S::Error;
/// type Transform = Timeout<S>;
/// type Future = Ready<Result<Self::Transform, Self::InitError>>;
///
/// fn new_transform(&self, service: S) -> Self::Future {
/// ok(TimeoutService {
/// service,
/// timeout: self.timeout,
/// })
/// }
/// }
/// ```
pub trait Transform<S> {
/// Requests handled by the service.
type Request;
@@ -41,13 +106,13 @@ pub trait Transform<S> {
Error = Self::Error,
>;
/// Errors produced while building a service.
/// Errors produced while building a transform service.
type InitError;
/// The future response value.
type Future: Future<Output = Result<Self::Transform, Self::InitError>>;
/// Creates and returns a new Service component, asynchronously
/// Creates and returns a new Transform component, asynchronously
fn new_transform(&self, service: S) -> Self::Future;
/// Map this transforms's factory error to a different error,

View File

@@ -20,12 +20,12 @@ pub use actix_macros::test;
/// # Examples
///
/// ```rust
/// use actix_service::{service_fn};
/// use actix_service::fn_service;
/// use actix_testing::TestServer;
///
/// #[actix_rt::main]
/// async fn main() {
/// let srv = TestServer::with(|| service_fn(
/// let srv = TestServer::with(|| fn_service(
/// |sock| async move {
/// println!("New connection: {:?}", sock);
/// Ok::<_, ()>(())

View File

@@ -33,7 +33,7 @@ nativetls = ["native-tls", "tokio-tls"]
[dependencies]
actix-service = "1.0.0-alpha.3"
actix-codec = "0.2.0-alpha.3"
actix-codec = "0.2.0"
actix-utils = "1.0.0-alpha.3"
actix-rt = "1.0.0-alpha.3"
derive_more = "0.99.2"

View File

@@ -3,7 +3,7 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
pub use open_ssl::ssl::{SslAcceptor, SslAcceptorBuilder};
pub use open_ssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
pub use tokio_openssl::{HandshakeError, SslStream};
use actix_codec::{AsyncRead, AsyncWrite};

View File

@@ -20,7 +20,7 @@ path = "src/lib.rs"
[dependencies]
actix-service = "1.0.0-alpha.3"
actix-rt = "1.0.0-alpha.3"
actix-codec = "0.2.0-alpha.3"
actix-codec = "0.2.0"
bytes = "0.5.2"
either = "1.5.2"
futures = "0.3.1"

View File

@@ -115,7 +115,7 @@ mod tests {
use std::time::Duration;
use super::*;
use actix_service::{apply, factory_fn, Service, ServiceFactory};
use actix_service::{apply, fn_factory, Service, ServiceFactory};
use futures::future::{lazy, ok, FutureExt, LocalBoxFuture};
struct SleepService(Duration);
@@ -155,7 +155,7 @@ mod tests {
async fn test_newtransform() {
let wait_time = Duration::from_millis(50);
let srv = apply(InFlight::new(1), factory_fn(|| ok(SleepService(wait_time))));
let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));
let mut srv = srv.new_service(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

View File

@@ -182,7 +182,7 @@ mod tests {
use std::time::Duration;
use super::*;
use actix_service::{apply, factory_fn, Service, ServiceFactory};
use actix_service::{apply, fn_factory, Service, ServiceFactory};
use futures::future::{ok, FutureExt, LocalBoxFuture};
struct SleepService(Duration);
@@ -229,7 +229,7 @@ mod tests {
let timeout = apply(
Timeout::new(resolution),
factory_fn(|| ok::<_, ()>(SleepService(wait_time))),
fn_factory(|| ok::<_, ()>(SleepService(wait_time))),
);
let mut srv = timeout.new_service(&()).await.unwrap();

View File

@@ -1,5 +1,9 @@
# Changes
[0.1.1] - 2019-12-07
* Fix hash impl
[0.1.0] - 2019-12-07
* Initial release

View File

@@ -1,6 +1,6 @@
[package]
name = "bytestring"
version = "0.1.0"
version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "A UTF-8 encoded string with Bytes as a storage"
keywords = ["actix"]

View File

@@ -45,7 +45,7 @@ impl PartialEq<str> for ByteString {
impl hash::Hash for ByteString {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.0.hash(state);
(**self).hash(state);
}
}
@@ -144,6 +144,19 @@ impl fmt::Display for ByteString {
#[cfg(test)]
mod test {
use super::*;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
#[test]
fn test_hash() {
let mut hasher1 = DefaultHasher::default();
"str".hash(&mut hasher1);
let mut hasher2 = DefaultHasher::default();
let s = ByteString::from_static("str");
s.hash(&mut hasher2);
assert_eq!(hasher1.finish(), hasher2.finish());
}
#[test]
fn test_from_string() {