1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-12 05:17:14 +02:00

Compare commits

..

20 Commits

Author SHA1 Message Date
Nikolay Kim
a4e0c71baa Merge branch 'master' of github.com:actix/actix-net 2019-04-04 15:41:50 -07:00
Nikolay Kim
b9ea445e70 Log error if dns system config could not be loaded 2019-04-04 15:41:05 -07:00
Nikolay Kim
ba2901269d Merge pull request #11 from Dowwie/master
added docs for trait Service, trait Transform
2019-04-04 11:06:02 -07:00
dowwie
5cbc29306a updated as per comments 2019-04-04 14:02:53 -04:00
Nikolay Kim
810fa869ae remove unneeded static 2019-04-04 10:04:19 -07:00
dowwie
33cd51aabf added docs for trait Service, trait Transform 2019-04-04 11:40:28 -04:00
Nikolay Kim
629ed05f82 Get dynamic segment by name instead of iterator 2019-04-03 21:40:21 -07:00
Nikolay Kim
5e8ae210f7 Rename connect Connector to TcpConnector #10 2019-03-31 19:14:13 -07:00
Nikolay Kim
3add90628f Fix SIGINT force shutdown 2019-03-30 12:09:02 -07:00
Nikolay Kim
02ab804e0b prepare actix-service release 2019-03-29 11:16:40 -07:00
Nikolay Kim
feac0b43d9 add impl Service for Rc<RefCell<S>> 2019-03-29 10:21:17 -07:00
Nikolay Kim
1441355d4f use release 2019-03-28 04:02:39 -07:00
Nikolay Kim
7c5afc09a6 move threadpool to separate crate 2019-03-28 03:56:52 -07:00
Nikolay Kim
16856c7d3f Merge branch 'master' of github.com:actix/actix-net 2019-03-27 17:30:54 -07:00
Nikolay Kim
95d02659d5 Added Framed::map_io() method 2019-03-27 17:30:37 -07:00
Juan Aguilar Santillana
bcbd7e6ddf Fix unnecessary arbiter clone at builder rt 2019-03-23 09:46:08 +03:00
Nikolay Kim
e0d3581239 allow to send messages to framed transport via mpsc channel 2019-03-20 09:44:23 -07:00
Nikolay Kim
ef1bdb2eb2 update travis config 2019-03-17 10:25:24 -07:00
Nikolay Kim
10301ff49d temp tarpaulin fix 2019-03-17 08:53:50 -07:00
Nikolay Kim
27c28d6597 Fix error handling for single address 2019-03-15 11:37:51 -07:00
33 changed files with 300 additions and 108 deletions

View File

@@ -37,36 +37,13 @@ script:
- |
if [[ "$TRAVIS_RUST_VERSION" != "nightly-2019-03-02" ]]; then
cargo clean
cargo test --features="ssl,tls,rust-tls" -- --nocapture
cd actix-codec && cargo test && cd ..
cd actix-service && cargo test && cd ..
cd actix-server && cargo test --all-features -- --nocapture && cd ..
cd actix-rt && cargo test && cd ..
cd actix-connect && cargo test && cd ..
cd actix-utils && cargo test && cd ..
cd router && cargo test && cd ..
cargo test --all --all-features -- --nocapture
fi
after_success:
- |
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-03-02" ]]; then
#cd actix-service && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
#cd actix-rt && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
#cd actix-connect && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
#cd actix-codec && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
#cd actix-server && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
#cd actix-utils && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
#cd router && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
cd actix-service && cargo tarpaulin --out Xml && cd ..
#cd actix-rt && cargo tarpaulin --out Xml && cd ..
cd actix-connect && cargo tarpaulin --out Xml && cd ..
#cd actix-codec && cargo tarpaulin --out Xml && cd ..
#cd actix-server && cargo tarpaulin --out Xml && cd ..
cd actix-utils && cargo tarpaulin --out Xml && cd ..
cd router && cargo tarpaulin --out Xml && cd ..
# cargo tarpaulin --all --all-features --out Xml
taskset -c 0 cargo tarpaulin --all --all-features --out Xml
echo "Uploaded code coverage"
bash <(curl -s https://codecov.io/bash)
fi

View File

@@ -22,6 +22,7 @@ members = [
"actix-server",
"actix-server-config",
"actix-test-server",
"actix-threadpool",
"actix-utils",
"router",
]

View File

@@ -1,6 +1,11 @@
# Changes
## [0.1.0] - 2019-03-06
## [0.1.2] - 2019-03-27
* Added `Framed::map_io()` method.
## [0.1.1] - 2019-03-06
* Added `FramedParts::with_read_buffer()` method.

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-codec"
version = "0.1.1"
version = "0.1.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
@@ -11,15 +11,15 @@ categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = "../"
workspace = ".."
[lib]
name = "actix_codec"
path = "src/lib.rs"
[dependencies]
bytes = "0.4"
bytes = "0.4.12"
futures = "0.1.24"
tokio-io = "0.1"
tokio-codec = "0.1"
tokio-io = "0.1.12"
tokio-codec = "0.1.1"
log = "0.4"

View File

@@ -167,6 +167,22 @@ impl<T, U> Framed<T, U> {
}
}
/// Consume the `Frame`, returning `Frame` with different io.
pub fn map_io<F, T2>(self, f: F) -> Framed<T2, U>
where
F: Fn(T) -> T2,
{
let (inner, read_buf) = self.inner.into_parts();
let (inner, write_buf, lw, hw) = inner.into_parts();
Framed {
inner: framed_read2_with_buffer(
framed_write2_with_buffer(Fuse(f(inner.0), inner.1), write_buf, lw, hw),
read_buf,
),
}
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2>(self, f: F) -> Framed<T, U2>
where

View File

@@ -1,5 +1,23 @@
# Changes
## [0.1.2] - 2019-04-04
### Added
* Log error if dns system config could not be loaded.
### Changed
* Rename connect Connector to TcpConnector #10
## [0.1.1] - 2019-03-15
### Fixed
* Fix error handling for single address
## [0.1.0] - 2019-03-14
* Refactor resolver and connector services

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-connect"
version = "0.1.0"
version = "0.1.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Connector - tcp connector service"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -12,54 +12,54 @@ use super::error::ConnectError;
/// Tcp connector service factory
#[derive(Debug)]
pub struct ConnectorFactory<T>(PhantomData<T>);
pub struct TcpConnectorFactory<T>(PhantomData<T>);
impl<T> ConnectorFactory<T> {
impl<T> TcpConnectorFactory<T> {
pub fn new() -> Self {
ConnectorFactory(PhantomData)
TcpConnectorFactory(PhantomData)
}
}
impl<T> Clone for ConnectorFactory<T> {
impl<T> Clone for TcpConnectorFactory<T> {
fn clone(&self) -> Self {
ConnectorFactory(PhantomData)
TcpConnectorFactory(PhantomData)
}
}
impl<T: Address> NewService for ConnectorFactory<T> {
impl<T: Address> NewService for TcpConnectorFactory<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError;
type Service = Connector<T>;
type Service = TcpConnector<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(Connector(PhantomData))
ok(TcpConnector(PhantomData))
}
}
/// Tcp connector service
#[derive(Debug)]
pub struct Connector<T>(PhantomData<T>);
pub struct TcpConnector<T>(PhantomData<T>);
impl<T> Connector<T> {
impl<T> TcpConnector<T> {
pub fn new() -> Self {
Connector(PhantomData)
TcpConnector(PhantomData)
}
}
impl<T> Clone for Connector<T> {
impl<T> Clone for TcpConnector<T> {
fn clone(&self) -> Self {
Connector(PhantomData)
TcpConnector(PhantomData)
}
}
impl<T: Address> Service for Connector<T> {
impl<T: Address> Service for TcpConnector<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError;
type Future = Either<ConnectorResponse<T>, FutureResult<Self::Response, Self::Error>>;
type Future = Either<TcpConnectorResponse<T>, FutureResult<Self::Response, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
@@ -70,7 +70,7 @@ impl<T: Address> Service for Connector<T> {
let Connect { req, addr, .. } = req;
if let Some(addr) = addr {
Either::A(ConnectorResponse::new(req, port, addr))
Either::A(TcpConnectorResponse::new(req, port, addr))
} else {
error!("TCP connector: got unresolved address");
Either::B(err(ConnectError::Unresolverd))
@@ -80,19 +80,19 @@ impl<T: Address> Service for Connector<T> {
#[doc(hidden)]
/// Tcp stream connector response future
pub struct ConnectorResponse<T> {
pub struct TcpConnectorResponse<T> {
req: Option<T>,
port: u16,
addrs: Option<VecDeque<SocketAddr>>,
stream: Option<ConnectFuture>,
}
impl<T: Address> ConnectorResponse<T> {
impl<T: Address> TcpConnectorResponse<T> {
pub fn new(
req: T,
port: u16,
addr: either::Either<SocketAddr, VecDeque<SocketAddr>>,
) -> ConnectorResponse<T> {
) -> TcpConnectorResponse<T> {
trace!(
"TCP connector - connecting to {:?} port:{}",
req.host(),
@@ -100,13 +100,13 @@ impl<T: Address> ConnectorResponse<T> {
);
match addr {
either::Either::Left(addr) => ConnectorResponse {
either::Either::Left(addr) => TcpConnectorResponse {
req: Some(req),
port,
addrs: None,
stream: Some(TcpStream::connect(&addr)),
},
either::Either::Right(addrs) => ConnectorResponse {
either::Either::Right(addrs) => TcpConnectorResponse {
req: Some(req),
port,
addrs: Some(addrs),
@@ -116,7 +116,7 @@ impl<T: Address> ConnectorResponse<T> {
}
}
impl<T: Address> Future for ConnectorResponse<T> {
impl<T: Address> Future for TcpConnectorResponse<T> {
type Item = Connection<T, TcpStream>;
type Error = ConnectError;
@@ -140,7 +140,7 @@ impl<T: Address> Future for ConnectorResponse<T> {
self.req.as_ref().unwrap().host(),
self.port,
);
if self.addrs.as_ref().unwrap().is_empty() {
if self.addrs.is_none() || self.addrs.as_ref().unwrap().is_empty() {
return Err(err.into());
}
}

View File

@@ -20,7 +20,7 @@ mod uri;
pub use trust_dns_resolver::{error::ResolveError, AsyncResolver};
pub use self::connect::{Address, Connect, Connection};
pub use self::connector::{Connector, ConnectorFactory};
pub use self::connector::{TcpConnector, TcpConnectorFactory};
pub use self::error::ConnectError;
pub use self::resolver::{Resolver, ResolverFactory};
@@ -29,6 +29,17 @@ use tokio_tcp::TcpStream;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::system_conf::read_system_conf;
#[doc(hidden)]
#[deprecated(since = "0.1.2", note = "please use `actix_connect::TcpConnector`")]
pub type Connector<T> = TcpConnector<T>;
#[doc(hidden)]
#[deprecated(
since = "0.1.2",
note = "please use `actix_connect::TcpConnectorFactory`"
)]
pub type ConnectorFactory<T> = TcpConnectorFactory<T>;
pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver {
let (resolver, bg) = AsyncResolver::new(cfg, opts);
tokio_current_thread::spawn(bg);
@@ -36,10 +47,12 @@ pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver
}
pub fn start_default_resolver() -> AsyncResolver {
let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
(cfg, opts)
} else {
(ResolverConfig::default(), ResolverOpts::default())
let (cfg, opts) = match read_system_conf() {
Ok((cfg, opts)) => (cfg, opts),
Err(e) => {
log::error!("TRust-DNS can not load system config: {}", e);
(ResolverConfig::default(), ResolverOpts::default())
}
};
let (resolver, bg) = AsyncResolver::new(cfg, opts);
@@ -52,7 +65,7 @@ pub fn new_connector<T: Address>(
resolver: AsyncResolver,
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
+ Clone {
Resolver::new(resolver).and_then(Connector::new())
Resolver::new(resolver).and_then(TcpConnector::new())
}
/// Create tcp connector service
@@ -64,14 +77,14 @@ pub fn new_connector_factory<T: Address>(
Error = ConnectError,
InitError = (),
> + Clone {
ResolverFactory::new(resolver).and_then(ConnectorFactory::new())
ResolverFactory::new(resolver).and_then(TcpConnectorFactory::new())
}
/// Create connector service with default parameters
pub fn default_connector<T: Address>(
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
+ Clone {
Resolver::new(start_default_resolver()).and_then(Connector::new())
Resolver::new(start_default_resolver()).and_then(TcpConnector::new())
}
/// Create connector service factory with default parameters
@@ -81,5 +94,5 @@ pub fn default_connector_factory<T: Address>() -> impl NewService<
Error = ConnectError,
InitError = (),
> + Clone {
ResolverFactory::new(start_default_resolver()).and_then(ConnectorFactory::new())
ResolverFactory::new(start_default_resolver()).and_then(TcpConnectorFactory::new())
}

View File

@@ -153,7 +153,9 @@ impl<T: Address> Future for ResolverFuture<T> {
req.host(),
addrs
);
if addrs.len() == 1 {
if addrs.is_empty() {
Err(ConnectError::NoRecords)
} else if addrs.len() == 1 {
req.addr = Some(either::Either::Left(addrs.pop_front().unwrap()));
Ok(Async::Ready(req))
} else {

View File

@@ -26,6 +26,7 @@ fn port(scheme: Option<&str>) -> Option<u16> {
"wss" => Some(443),
"amqp" => Some(5672),
"amqps" => Some(5671),
"sb" => Some(5671),
"mqtt" => Some(1883),
"mqtts" => Some(8883),
_ => None,

View File

@@ -43,13 +43,22 @@ fn test_static_str() {
))
.unwrap();
let mut conn = srv
.block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver))))
.block_on(lazy(|| {
Ok::<_, ()>(actix_connect::new_connector(resolver.clone()))
}))
.unwrap();
let con = srv
.block_on(conn.call(Connect::with("10", srv.addr())))
.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
let connect = Connect::new(srv.host().to_owned());
let mut conn = srv
.block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver))))
.unwrap();
let con = srv.block_on(conn.call(connect));
assert!(con.is_err());
}
#[test]

View File

@@ -1,5 +1,11 @@
# Changes
## [0.2.2] - 2019-03-28
### Changed
* Moved `blocking` module to `actix-threadpool` crate
## [0.2.1] - 2019-03-11
### Added

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-rt"
version = "0.2.1"
version = "0.2.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix runtime"
keywords = ["network", "framework", "async", "futures"]
@@ -11,21 +11,15 @@ categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = "../"
workspace = ".."
[lib]
name = "actix_rt"
path = "src/lib.rs"
[dependencies]
bytes = "0.4"
derive_more = "0.14"
actix-threadpool = "0.1.0"
futures = "0.1.25"
parking_lot = "0.7"
lazy_static = "1.2"
log = "0.4"
num_cpus = "1.10"
threadpool = "1.7"
tokio-current-thread = "0.1"
tokio-executor = "0.1.5"
tokio-reactor = "0.1.7"

View File

@@ -86,8 +86,7 @@ impl Builder {
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let arbiter = Arbiter::new_system();
let system = System::construct(sys_sender, arbiter.clone(), self.stop_on_panic);
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);

View File

@@ -1,7 +1,6 @@
//! A runtime implementation that runs everything on the current thread.
mod arbiter;
pub mod blocking;
mod builder;
mod runtime;
mod system;
@@ -11,6 +10,9 @@ pub use self::builder::{Builder, SystemRunner};
pub use self::runtime::Runtime;
pub use self::system::System;
#[doc(hidden)]
pub use actix_threadpool as blocking;
/// Spawns a future on the current arbiter.
///
/// # Panics

View File

@@ -1,5 +1,12 @@
# Changes
## [0.4.2] - 2019-03-30
### Fixed
* Fix SIGINT force shutdown
## [0.4.1] - 2019-03-14
### Added

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "0.4.1"
version = "0.4.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -328,7 +328,7 @@ impl ServerBuilder {
self.accept.send(Command::Stop);
// stop workers
if !self.workers.is_empty() {
if !self.workers.is_empty() && graceful {
spawn(
futures_unordered(
self.workers

View File

@@ -1,5 +1,12 @@
# Changes
## [0.3.5] - 2019-03-29
### Added
* Add `impl<S: Service> Service for Rc<RefCell<S>>`
## [0.3.4] - 2019-03-12
### Added

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-service"
version = "0.3.4"
version = "0.3.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -1,3 +1,4 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
@@ -391,6 +392,24 @@ where
}
}
impl<S> Service for Rc<RefCell<S>>
where
S: Service,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), S::Error> {
self.borrow_mut().poll_ready()
}
fn call(&mut self, request: Self::Request) -> S::Future {
self.borrow_mut().call(request)
}
}
impl<S, C> NewService<C> for Rc<S>
where
S: NewService<C>,

View File

@@ -6,9 +6,16 @@ use futures::{Async, Future, IntoFuture, Poll};
use crate::transform_err::{TransformFromErr, TransformMapInitErr};
use crate::{IntoNewService, NewService, Service};
/// `Transform` service factory.
/// The `Transform` trait defines the interface of a Service factory. `Transform`
/// is often implemented for middleware, defining how to manufacture a
/// middleware Service. A Service that is manufactured by the factory takes
/// the Service that follows it during execution as a parameter, assuming
/// ownership of the next Service. A Service can be a variety of types, such
/// as (but not limited to) another middleware Service, an extractor Service,
/// other helper Services, or the request handler endpoint Service.
///
/// A Service is created by the factory during server initialization.
///
/// Transform factory creates service that wraps other services.
/// `Config` is a service factory configuration type.
pub trait Transform<S> {
/// Requests handled by the service.
@@ -33,7 +40,7 @@ pub trait Transform<S> {
/// The future response value.
type Future: Future<Item = Self::Transform, Error = Self::InitError>;
/// Create and return a new service value asynchronously.
/// Creates and returns a new Service component, asynchronously
fn new_transform(&self, service: S) -> Self::Future;
/// Map this service's factory error to a different error,

View File

@@ -0,0 +1,5 @@
# Changes
## [0.1.0] - 2019-03-28
* Move threadpool to separate crate

View File

@@ -0,0 +1,27 @@
[package]
name = "actix-threadpool"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix thread pool for sync code"
keywords = ["actix", "network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-threadpool/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = ".."
[lib]
name = "actix_threadpool"
path = "src/lib.rs"
[dependencies]
derive_more = "0.14"
futures = "0.1.25"
parking_lot = "0.7"
lazy_static = "1.2"
log = "0.4"
num_cpus = "1.10"
threadpool = "1.7"

View File

@@ -9,7 +9,7 @@ use parking_lot::Mutex;
use threadpool::ThreadPool;
/// Env variable for default cpu pool size
const ENV_CPU_POOL_VAR: &str = "ACTIX_CPU_POOL";
const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL";
lazy_static::lazy_static! {
pub(crate) static ref DEFAULT_POOL: Mutex<ThreadPool> = {
@@ -18,7 +18,7 @@ lazy_static::lazy_static! {
if let Ok(val) = val.parse() {
val
} else {
log::error!("Can not parse ACTIX_CPU_POOL value");
log::error!("Can not parse ACTIX_THREADPOOL value");
num_cpus::get() * 5
}
}

View File

@@ -1,5 +1,17 @@
# Changes
## [0.3.5] - 2019-04-04
### Added
* Allow to send messages to `FramedTransport` via mpsc channel.
### Changed
* Remove 'static constraint from Clonable service
## [0.3.4] - 2019-03-12
### Changed

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-utils"
version = "0.3.4"
version = "0.3.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -7,12 +7,12 @@ use futures::Poll;
use super::cell::Cell;
/// Service that allows to turn non-clone service to a service with `Clone` impl
pub struct CloneableService<T: 'static> {
pub struct CloneableService<T> {
service: Cell<T>,
_t: PhantomData<Rc<()>>,
}
impl<T: 'static> CloneableService<T> {
impl<T> CloneableService<T> {
pub fn new(service: T) -> Self
where
T: Service,
@@ -24,7 +24,7 @@ impl<T: 'static> CloneableService<T> {
}
}
impl<T: 'static> Clone for CloneableService<T> {
impl<T> Clone for CloneableService<T> {
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
@@ -35,7 +35,7 @@ impl<T: 'static> Clone for CloneableService<T> {
impl<T> Service for CloneableService<T>
where
T: Service + 'static,
T: Service,
{
type Request = T::Request;
type Response = T::Response;

View File

@@ -7,6 +7,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, FutureResult};
use futures::task::AtomicTask;
use futures::unsync::mpsc;
use futures::{Async, Future, IntoFuture, Poll, Sink, Stream};
use log::debug;
@@ -178,6 +179,11 @@ impl<E, U: Encoder + Decoder> From<E> for FramedTransportError<E, U> {
}
}
pub enum FramedMessage<T> {
Message(T),
Close,
}
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
pub struct FramedTransport<S, T, U>
@@ -193,6 +199,7 @@ where
service: S,
state: TransportState<S, U>,
framed: Framed<T, U>,
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
inner: Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>,
}
@@ -200,6 +207,7 @@ enum TransportState<S: Service, U: Encoder + Decoder> {
Processing,
Error(FramedTransportError<S::Error, U>),
FramedError(FramedTransportError<S::Error, U>),
FlushAndStop,
Stopping,
}
@@ -257,10 +265,12 @@ where
/// write to framed object
fn poll_write(&mut self) -> bool {
let inner = self.inner.get_mut();
let mut rx_done = self.rx.is_none();
let mut buf_empty = inner.buf.is_empty();
loop {
while !self.framed.is_write_buf_full() {
if let Some(msg) = inner.buf.pop_front() {
match msg {
if !buf_empty {
match inner.buf.pop_front().unwrap() {
Ok(msg) => {
if let Err(err) = self.framed.force_send(msg) {
self.state = TransportState::FramedError(
@@ -268,6 +278,7 @@ where
);
return true;
}
buf_empty = inner.buf.is_empty();
}
Err(err) => {
self.state =
@@ -275,7 +286,33 @@ where
return true;
}
}
} else {
}
if !rx_done && self.rx.is_some() {
match self.rx.as_mut().unwrap().poll() {
Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => {
if let Err(err) = self.framed.force_send(msg) {
self.state = TransportState::FramedError(
FramedTransportError::Encoder(err),
);
return true;
}
}
Ok(Async::Ready(Some(FramedMessage::Close))) => {
self.state = TransportState::FlushAndStop;
return true;
}
Ok(Async::Ready(None)) => {
let _ = self.rx.take();
}
Ok(Async::NotReady) => rx_done = true,
Err(_e) => {
let _ = self.rx.take();
}
}
}
if rx_done && buf_empty {
break;
}
}
@@ -313,6 +350,7 @@ where
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
FramedTransport {
framed,
rx: None,
service: service.into_service(),
state: TransportState::Processing,
inner: Cell::new(FramedTransportInner {
@@ -322,6 +360,15 @@ where
}
}
/// Get Sender
pub fn set_receiver(
mut self,
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
) -> Self {
self.rx = Some(rx);
self
}
/// Get reference to a service wrapped by `FramedTransport` instance.
pub fn get_ref(&self) -> &S {
&self.service
@@ -378,6 +425,20 @@ where
Ok(Async::NotReady)
}
}
TransportState::FlushAndStop => {
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Err(err) => {
debug!("Error sending data: {:?}", err);
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
}
} else {
Ok(Async::Ready(()))
}
}
TransportState::FramedError(err) => Err(err),
TransportState::Stopping => Ok(Async::Ready(())),
}

View File

@@ -1,5 +1,9 @@
# Changes
## [0.1.1] - 2019-04-03
* Get dynamic segment by name instead of iterator.
## [0.1.0] - 2019-03-09
* Initial release

View File

@@ -1,16 +1,16 @@
[package]
name = "actix-router"
version = "0.1.0"
version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Path router"
keywords = ["actix"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://actix.rs/api/actix-net/stable/actix_router/"
documentation = "https://docs.rs/actix-router/"
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = "../"
workspace = ".."
[lib]
name = "actix_router"
@@ -23,7 +23,8 @@ default = ["http"]
bytes = "0.4"
regex = "1.0"
serde = "1.0.80"
string = "0.1.3"
string = "0.2.0"
log = "0.4"
http = { version="0.1.14", optional=true }
[dev-dependencies]

View File

@@ -196,18 +196,17 @@ impl ResourceDef {
[PathItem::Static(""); MAX_DYNAMIC_SEGMENTS];
if let Some(captures) = re.captures(path.path()) {
let mut passed = false;
for capture in captures.iter() {
if let Some(ref m) = capture {
if !passed {
passed = true;
continue;
}
segments[idx] = PathItem::Segment(m.start() as u16, m.end() as u16);
for (no, name) in names.iter().enumerate() {
if let Some(m) = captures.name(&name) {
idx += 1;
pos = m.end();
segments[no] = PathItem::Segment(m.start() as u16, m.end() as u16);
} else {
log::error!(
"Dynamic path match but not all segments found: {}",
name
);
false;
}
}
} else {