mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-13 06:08:21 +02:00
Compare commits
12 Commits
connect-v0
...
server-v0.
Author | SHA1 | Date | |
---|---|---|---|
|
3add90628f | ||
|
02ab804e0b | ||
|
feac0b43d9 | ||
|
1441355d4f | ||
|
7c5afc09a6 | ||
|
16856c7d3f | ||
|
95d02659d5 | ||
|
bcbd7e6ddf | ||
|
e0d3581239 | ||
|
ef1bdb2eb2 | ||
|
10301ff49d | ||
|
27c28d6597 |
27
.travis.yml
27
.travis.yml
@@ -37,36 +37,13 @@ script:
|
||||
- |
|
||||
if [[ "$TRAVIS_RUST_VERSION" != "nightly-2019-03-02" ]]; then
|
||||
cargo clean
|
||||
cargo test --features="ssl,tls,rust-tls" -- --nocapture
|
||||
cd actix-codec && cargo test && cd ..
|
||||
cd actix-service && cargo test && cd ..
|
||||
cd actix-server && cargo test --all-features -- --nocapture && cd ..
|
||||
cd actix-rt && cargo test && cd ..
|
||||
cd actix-connect && cargo test && cd ..
|
||||
cd actix-utils && cargo test && cd ..
|
||||
cd router && cargo test && cd ..
|
||||
cargo test --all --all-features -- --nocapture
|
||||
fi
|
||||
|
||||
after_success:
|
||||
- |
|
||||
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-03-02" ]]; then
|
||||
#cd actix-service && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
||||
#cd actix-rt && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
||||
#cd actix-connect && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
||||
#cd actix-codec && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
||||
#cd actix-server && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
||||
#cd actix-utils && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
||||
#cd router && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
||||
|
||||
cd actix-service && cargo tarpaulin --out Xml && cd ..
|
||||
#cd actix-rt && cargo tarpaulin --out Xml && cd ..
|
||||
cd actix-connect && cargo tarpaulin --out Xml && cd ..
|
||||
#cd actix-codec && cargo tarpaulin --out Xml && cd ..
|
||||
#cd actix-server && cargo tarpaulin --out Xml && cd ..
|
||||
cd actix-utils && cargo tarpaulin --out Xml && cd ..
|
||||
cd router && cargo tarpaulin --out Xml && cd ..
|
||||
|
||||
# cargo tarpaulin --all --all-features --out Xml
|
||||
taskset -c 0 cargo tarpaulin --all --all-features --out Xml
|
||||
echo "Uploaded code coverage"
|
||||
bash <(curl -s https://codecov.io/bash)
|
||||
fi
|
||||
|
@@ -22,6 +22,7 @@ members = [
|
||||
"actix-server",
|
||||
"actix-server-config",
|
||||
"actix-test-server",
|
||||
"actix-threadpool",
|
||||
"actix-utils",
|
||||
"router",
|
||||
]
|
||||
|
@@ -1,6 +1,11 @@
|
||||
# Changes
|
||||
|
||||
## [0.1.0] - 2019-03-06
|
||||
## [0.1.2] - 2019-03-27
|
||||
|
||||
* Added `Framed::map_io()` method.
|
||||
|
||||
|
||||
## [0.1.1] - 2019-03-06
|
||||
|
||||
* Added `FramedParts::with_read_buffer()` method.
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-codec"
|
||||
version = "0.1.1"
|
||||
version = "0.1.2"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Utilities for encoding and decoding frames"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
@@ -11,15 +11,15 @@ categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT/Apache-2.0"
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
||||
edition = "2018"
|
||||
workspace = "../"
|
||||
workspace = ".."
|
||||
|
||||
[lib]
|
||||
name = "actix_codec"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
bytes = "0.4"
|
||||
bytes = "0.4.12"
|
||||
futures = "0.1.24"
|
||||
tokio-io = "0.1"
|
||||
tokio-codec = "0.1"
|
||||
tokio-io = "0.1.12"
|
||||
tokio-codec = "0.1.1"
|
||||
log = "0.4"
|
@@ -167,6 +167,22 @@ impl<T, U> Framed<T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume the `Frame`, returning `Frame` with different io.
|
||||
pub fn map_io<F, T2>(self, f: F) -> Framed<T2, U>
|
||||
where
|
||||
F: Fn(T) -> T2,
|
||||
{
|
||||
let (inner, read_buf) = self.inner.into_parts();
|
||||
let (inner, write_buf, lw, hw) = inner.into_parts();
|
||||
|
||||
Framed {
|
||||
inner: framed_read2_with_buffer(
|
||||
framed_write2_with_buffer(Fuse(f(inner.0), inner.1), write_buf, lw, hw),
|
||||
read_buf,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume the `Frame`, returning `Frame` with different codec.
|
||||
pub fn map_codec<F, U2>(self, f: F) -> Framed<T, U2>
|
||||
where
|
||||
|
@@ -1,5 +1,12 @@
|
||||
# Changes
|
||||
|
||||
## [0.1.1] - 2019-03-15
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fix error handling for single address
|
||||
|
||||
|
||||
## [0.1.0] - 2019-03-14
|
||||
|
||||
* Refactor resolver and connector services
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-connect"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix Connector - tcp connector service"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -140,7 +140,7 @@ impl<T: Address> Future for ConnectorResponse<T> {
|
||||
self.req.as_ref().unwrap().host(),
|
||||
self.port,
|
||||
);
|
||||
if self.addrs.as_ref().unwrap().is_empty() {
|
||||
if self.addrs.is_none() || self.addrs.as_ref().unwrap().is_empty() {
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
|
@@ -153,7 +153,9 @@ impl<T: Address> Future for ResolverFuture<T> {
|
||||
req.host(),
|
||||
addrs
|
||||
);
|
||||
if addrs.len() == 1 {
|
||||
if addrs.is_empty() {
|
||||
Err(ConnectError::NoRecords)
|
||||
} else if addrs.len() == 1 {
|
||||
req.addr = Some(either::Either::Left(addrs.pop_front().unwrap()));
|
||||
Ok(Async::Ready(req))
|
||||
} else {
|
||||
|
@@ -26,6 +26,7 @@ fn port(scheme: Option<&str>) -> Option<u16> {
|
||||
"wss" => Some(443),
|
||||
"amqp" => Some(5672),
|
||||
"amqps" => Some(5671),
|
||||
"sb" => Some(5671),
|
||||
"mqtt" => Some(1883),
|
||||
"mqtts" => Some(8883),
|
||||
_ => None,
|
||||
|
@@ -43,13 +43,22 @@ fn test_static_str() {
|
||||
))
|
||||
.unwrap();
|
||||
let mut conn = srv
|
||||
.block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver))))
|
||||
.block_on(lazy(|| {
|
||||
Ok::<_, ()>(actix_connect::new_connector(resolver.clone()))
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let con = srv
|
||||
.block_on(conn.call(Connect::with("10", srv.addr())))
|
||||
.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
|
||||
let connect = Connect::new(srv.host().to_owned());
|
||||
let mut conn = srv
|
||||
.block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver))))
|
||||
.unwrap();
|
||||
let con = srv.block_on(conn.call(connect));
|
||||
assert!(con.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@@ -1,5 +1,11 @@
|
||||
# Changes
|
||||
|
||||
## [0.2.2] - 2019-03-28
|
||||
|
||||
### Changed
|
||||
|
||||
* Moved `blocking` module to `actix-threadpool` crate
|
||||
|
||||
## [0.2.1] - 2019-03-11
|
||||
|
||||
### Added
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-rt"
|
||||
version = "0.2.1"
|
||||
version = "0.2.2"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix runtime"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
@@ -11,21 +11,15 @@ categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT/Apache-2.0"
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
||||
edition = "2018"
|
||||
workspace = "../"
|
||||
workspace = ".."
|
||||
|
||||
[lib]
|
||||
name = "actix_rt"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
bytes = "0.4"
|
||||
derive_more = "0.14"
|
||||
actix-threadpool = "0.1.0"
|
||||
futures = "0.1.25"
|
||||
parking_lot = "0.7"
|
||||
lazy_static = "1.2"
|
||||
log = "0.4"
|
||||
num_cpus = "1.10"
|
||||
threadpool = "1.7"
|
||||
tokio-current-thread = "0.1"
|
||||
tokio-executor = "0.1.5"
|
||||
tokio-reactor = "0.1.7"
|
||||
|
@@ -86,8 +86,7 @@ impl Builder {
|
||||
let (stop_tx, stop) = channel();
|
||||
let (sys_sender, sys_receiver) = unbounded();
|
||||
|
||||
let arbiter = Arbiter::new_system();
|
||||
let system = System::construct(sys_sender, arbiter.clone(), self.stop_on_panic);
|
||||
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
|
||||
|
||||
// system arbiter
|
||||
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
||||
|
@@ -1,7 +1,6 @@
|
||||
//! A runtime implementation that runs everything on the current thread.
|
||||
|
||||
mod arbiter;
|
||||
pub mod blocking;
|
||||
mod builder;
|
||||
mod runtime;
|
||||
mod system;
|
||||
@@ -11,6 +10,9 @@ pub use self::builder::{Builder, SystemRunner};
|
||||
pub use self::runtime::Runtime;
|
||||
pub use self::system::System;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use actix_threadpool as blocking;
|
||||
|
||||
/// Spawns a future on the current arbiter.
|
||||
///
|
||||
/// # Panics
|
||||
|
@@ -1,5 +1,12 @@
|
||||
# Changes
|
||||
|
||||
## [0.4.2] - 2019-03-30
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fix SIGINT force shutdown
|
||||
|
||||
|
||||
## [0.4.1] - 2019-03-14
|
||||
|
||||
### Added
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-server"
|
||||
version = "0.4.1"
|
||||
version = "0.4.2"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix server - General purpose tcp server"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -328,7 +328,7 @@ impl ServerBuilder {
|
||||
self.accept.send(Command::Stop);
|
||||
|
||||
// stop workers
|
||||
if !self.workers.is_empty() {
|
||||
if !self.workers.is_empty() && graceful {
|
||||
spawn(
|
||||
futures_unordered(
|
||||
self.workers
|
||||
|
@@ -1,5 +1,12 @@
|
||||
# Changes
|
||||
|
||||
## [0.3.5] - 2019-03-29
|
||||
|
||||
### Added
|
||||
|
||||
* Add `impl<S: Service> Service for Rc<RefCell<S>>`
|
||||
|
||||
|
||||
## [0.3.4] - 2019-03-12
|
||||
|
||||
### Added
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-service"
|
||||
version = "0.3.4"
|
||||
version = "0.3.5"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix Service"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -1,3 +1,4 @@
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -391,6 +392,24 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Service for Rc<RefCell<S>>
|
||||
where
|
||||
S: Service,
|
||||
{
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), S::Error> {
|
||||
self.borrow_mut().poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Self::Request) -> S::Future {
|
||||
self.borrow_mut().call(request)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, C> NewService<C> for Rc<S>
|
||||
where
|
||||
S: NewService<C>,
|
||||
|
5
actix-threadpool/CHANGES.md
Normal file
5
actix-threadpool/CHANGES.md
Normal file
@@ -0,0 +1,5 @@
|
||||
# Changes
|
||||
|
||||
## [0.1.0] - 2019-03-28
|
||||
|
||||
* Move threadpool to separate crate
|
27
actix-threadpool/Cargo.toml
Normal file
27
actix-threadpool/Cargo.toml
Normal file
@@ -0,0 +1,27 @@
|
||||
[package]
|
||||
name = "actix-threadpool"
|
||||
version = "0.1.0"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix thread pool for sync code"
|
||||
keywords = ["actix", "network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/actix-threadpool/"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT/Apache-2.0"
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
||||
edition = "2018"
|
||||
workspace = ".."
|
||||
|
||||
[lib]
|
||||
name = "actix_threadpool"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
derive_more = "0.14"
|
||||
futures = "0.1.25"
|
||||
parking_lot = "0.7"
|
||||
lazy_static = "1.2"
|
||||
log = "0.4"
|
||||
num_cpus = "1.10"
|
||||
threadpool = "1.7"
|
@@ -9,7 +9,7 @@ use parking_lot::Mutex;
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
/// Env variable for default cpu pool size
|
||||
const ENV_CPU_POOL_VAR: &str = "ACTIX_CPU_POOL";
|
||||
const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL";
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub(crate) static ref DEFAULT_POOL: Mutex<ThreadPool> = {
|
||||
@@ -18,7 +18,7 @@ lazy_static::lazy_static! {
|
||||
if let Ok(val) = val.parse() {
|
||||
val
|
||||
} else {
|
||||
log::error!("Can not parse ACTIX_CPU_POOL value");
|
||||
log::error!("Can not parse ACTIX_THREADPOOL value");
|
||||
num_cpus::get() * 5
|
||||
}
|
||||
}
|
@@ -1,5 +1,10 @@
|
||||
# Changes
|
||||
|
||||
### Added
|
||||
|
||||
* Allow to send messages to `FramedTransport` via mpsc channel.
|
||||
|
||||
|
||||
## [0.3.4] - 2019-03-12
|
||||
|
||||
### Changed
|
||||
|
@@ -7,6 +7,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
||||
use actix_service::{IntoNewService, IntoService, NewService, Service};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::task::AtomicTask;
|
||||
use futures::unsync::mpsc;
|
||||
use futures::{Async, Future, IntoFuture, Poll, Sink, Stream};
|
||||
use log::debug;
|
||||
|
||||
@@ -178,6 +179,11 @@ impl<E, U: Encoder + Decoder> From<E> for FramedTransportError<E, U> {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum FramedMessage<T> {
|
||||
Message(T),
|
||||
Close,
|
||||
}
|
||||
|
||||
/// FramedTransport - is a future that reads frames from Framed object
|
||||
/// and pass then to the service.
|
||||
pub struct FramedTransport<S, T, U>
|
||||
@@ -193,6 +199,7 @@ where
|
||||
service: S,
|
||||
state: TransportState<S, U>,
|
||||
framed: Framed<T, U>,
|
||||
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
|
||||
inner: Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>,
|
||||
}
|
||||
|
||||
@@ -200,6 +207,7 @@ enum TransportState<S: Service, U: Encoder + Decoder> {
|
||||
Processing,
|
||||
Error(FramedTransportError<S::Error, U>),
|
||||
FramedError(FramedTransportError<S::Error, U>),
|
||||
FlushAndStop,
|
||||
Stopping,
|
||||
}
|
||||
|
||||
@@ -257,10 +265,12 @@ where
|
||||
/// write to framed object
|
||||
fn poll_write(&mut self) -> bool {
|
||||
let inner = self.inner.get_mut();
|
||||
let mut rx_done = self.rx.is_none();
|
||||
let mut buf_empty = inner.buf.is_empty();
|
||||
loop {
|
||||
while !self.framed.is_write_buf_full() {
|
||||
if let Some(msg) = inner.buf.pop_front() {
|
||||
match msg {
|
||||
if !buf_empty {
|
||||
match inner.buf.pop_front().unwrap() {
|
||||
Ok(msg) => {
|
||||
if let Err(err) = self.framed.force_send(msg) {
|
||||
self.state = TransportState::FramedError(
|
||||
@@ -268,6 +278,7 @@ where
|
||||
);
|
||||
return true;
|
||||
}
|
||||
buf_empty = inner.buf.is_empty();
|
||||
}
|
||||
Err(err) => {
|
||||
self.state =
|
||||
@@ -275,7 +286,33 @@ where
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
||||
if !rx_done && self.rx.is_some() {
|
||||
match self.rx.as_mut().unwrap().poll() {
|
||||
Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => {
|
||||
if let Err(err) = self.framed.force_send(msg) {
|
||||
self.state = TransportState::FramedError(
|
||||
FramedTransportError::Encoder(err),
|
||||
);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Ok(Async::Ready(Some(FramedMessage::Close))) => {
|
||||
self.state = TransportState::FlushAndStop;
|
||||
return true;
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
let _ = self.rx.take();
|
||||
}
|
||||
Ok(Async::NotReady) => rx_done = true,
|
||||
Err(_e) => {
|
||||
let _ = self.rx.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if rx_done && buf_empty {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -313,6 +350,7 @@ where
|
||||
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
|
||||
FramedTransport {
|
||||
framed,
|
||||
rx: None,
|
||||
service: service.into_service(),
|
||||
state: TransportState::Processing,
|
||||
inner: Cell::new(FramedTransportInner {
|
||||
@@ -322,6 +360,15 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Get Sender
|
||||
pub fn set_receiver(
|
||||
mut self,
|
||||
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
|
||||
) -> Self {
|
||||
self.rx = Some(rx);
|
||||
self
|
||||
}
|
||||
|
||||
/// Get reference to a service wrapped by `FramedTransport` instance.
|
||||
pub fn get_ref(&self) -> &S {
|
||||
&self.service
|
||||
@@ -378,6 +425,20 @@ where
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
TransportState::FlushAndStop => {
|
||||
if !self.framed.is_write_buf_empty() {
|
||||
match self.framed.poll_complete() {
|
||||
Err(err) => {
|
||||
debug!("Error sending data: {:?}", err);
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
|
||||
}
|
||||
} else {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
TransportState::FramedError(err) => Err(err),
|
||||
TransportState::Stopping => Ok(Async::Ready(())),
|
||||
}
|
||||
|
Reference in New Issue
Block a user