mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-14 21:30:32 +02:00
Compare commits
7 Commits
connect-v0
...
codec-0.1.
Author | SHA1 | Date | |
---|---|---|---|
|
16856c7d3f | ||
|
95d02659d5 | ||
|
bcbd7e6ddf | ||
|
e0d3581239 | ||
|
ef1bdb2eb2 | ||
|
10301ff49d | ||
|
27c28d6597 |
27
.travis.yml
27
.travis.yml
@@ -37,36 +37,13 @@ script:
|
|||||||
- |
|
- |
|
||||||
if [[ "$TRAVIS_RUST_VERSION" != "nightly-2019-03-02" ]]; then
|
if [[ "$TRAVIS_RUST_VERSION" != "nightly-2019-03-02" ]]; then
|
||||||
cargo clean
|
cargo clean
|
||||||
cargo test --features="ssl,tls,rust-tls" -- --nocapture
|
cargo test --all --all-features -- --nocapture
|
||||||
cd actix-codec && cargo test && cd ..
|
|
||||||
cd actix-service && cargo test && cd ..
|
|
||||||
cd actix-server && cargo test --all-features -- --nocapture && cd ..
|
|
||||||
cd actix-rt && cargo test && cd ..
|
|
||||||
cd actix-connect && cargo test && cd ..
|
|
||||||
cd actix-utils && cargo test && cd ..
|
|
||||||
cd router && cargo test && cd ..
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
after_success:
|
after_success:
|
||||||
- |
|
- |
|
||||||
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-03-02" ]]; then
|
if [[ "$TRAVIS_RUST_VERSION" == "nightly-2019-03-02" ]]; then
|
||||||
#cd actix-service && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
taskset -c 0 cargo tarpaulin --all --all-features --out Xml
|
||||||
#cd actix-rt && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
|
||||||
#cd actix-connect && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
|
||||||
#cd actix-codec && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
|
||||||
#cd actix-server && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
|
||||||
#cd actix-utils && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
|
||||||
#cd router && cargo tarpaulin --out Xml && bash <(curl -s https://codecov.io/bash) && cd ..
|
|
||||||
|
|
||||||
cd actix-service && cargo tarpaulin --out Xml && cd ..
|
|
||||||
#cd actix-rt && cargo tarpaulin --out Xml && cd ..
|
|
||||||
cd actix-connect && cargo tarpaulin --out Xml && cd ..
|
|
||||||
#cd actix-codec && cargo tarpaulin --out Xml && cd ..
|
|
||||||
#cd actix-server && cargo tarpaulin --out Xml && cd ..
|
|
||||||
cd actix-utils && cargo tarpaulin --out Xml && cd ..
|
|
||||||
cd router && cargo tarpaulin --out Xml && cd ..
|
|
||||||
|
|
||||||
# cargo tarpaulin --all --all-features --out Xml
|
|
||||||
echo "Uploaded code coverage"
|
echo "Uploaded code coverage"
|
||||||
bash <(curl -s https://codecov.io/bash)
|
bash <(curl -s https://codecov.io/bash)
|
||||||
fi
|
fi
|
||||||
|
@@ -1,6 +1,11 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## [0.1.0] - 2019-03-06
|
## [0.1.2] - 2019-03-27
|
||||||
|
|
||||||
|
* Added `Framed::map_io()` method.
|
||||||
|
|
||||||
|
|
||||||
|
## [0.1.1] - 2019-03-06
|
||||||
|
|
||||||
* Added `FramedParts::with_read_buffer()` method.
|
* Added `FramedParts::with_read_buffer()` method.
|
||||||
|
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-codec"
|
name = "actix-codec"
|
||||||
version = "0.1.1"
|
version = "0.1.2"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Utilities for encoding and decoding frames"
|
description = "Utilities for encoding and decoding frames"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
@@ -11,15 +11,15 @@ categories = ["network-programming", "asynchronous"]
|
|||||||
license = "MIT/Apache-2.0"
|
license = "MIT/Apache-2.0"
|
||||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
workspace = "../"
|
workspace = ".."
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
name = "actix_codec"
|
name = "actix_codec"
|
||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "0.4"
|
bytes = "0.4.12"
|
||||||
futures = "0.1.24"
|
futures = "0.1.24"
|
||||||
tokio-io = "0.1"
|
tokio-io = "0.1.12"
|
||||||
tokio-codec = "0.1"
|
tokio-codec = "0.1.1"
|
||||||
log = "0.4"
|
log = "0.4"
|
@@ -167,6 +167,22 @@ impl<T, U> Framed<T, U> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Consume the `Frame`, returning `Frame` with different io.
|
||||||
|
pub fn map_io<F, T2>(self, f: F) -> Framed<T2, U>
|
||||||
|
where
|
||||||
|
F: Fn(T) -> T2,
|
||||||
|
{
|
||||||
|
let (inner, read_buf) = self.inner.into_parts();
|
||||||
|
let (inner, write_buf, lw, hw) = inner.into_parts();
|
||||||
|
|
||||||
|
Framed {
|
||||||
|
inner: framed_read2_with_buffer(
|
||||||
|
framed_write2_with_buffer(Fuse(f(inner.0), inner.1), write_buf, lw, hw),
|
||||||
|
read_buf,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Consume the `Frame`, returning `Frame` with different codec.
|
/// Consume the `Frame`, returning `Frame` with different codec.
|
||||||
pub fn map_codec<F, U2>(self, f: F) -> Framed<T, U2>
|
pub fn map_codec<F, U2>(self, f: F) -> Framed<T, U2>
|
||||||
where
|
where
|
||||||
|
@@ -1,5 +1,12 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.1.1] - 2019-03-15
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Fix error handling for single address
|
||||||
|
|
||||||
|
|
||||||
## [0.1.0] - 2019-03-14
|
## [0.1.0] - 2019-03-14
|
||||||
|
|
||||||
* Refactor resolver and connector services
|
* Refactor resolver and connector services
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-connect"
|
name = "actix-connect"
|
||||||
version = "0.1.0"
|
version = "0.1.1"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix Connector - tcp connector service"
|
description = "Actix Connector - tcp connector service"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@@ -140,7 +140,7 @@ impl<T: Address> Future for ConnectorResponse<T> {
|
|||||||
self.req.as_ref().unwrap().host(),
|
self.req.as_ref().unwrap().host(),
|
||||||
self.port,
|
self.port,
|
||||||
);
|
);
|
||||||
if self.addrs.as_ref().unwrap().is_empty() {
|
if self.addrs.is_none() || self.addrs.as_ref().unwrap().is_empty() {
|
||||||
return Err(err.into());
|
return Err(err.into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -153,7 +153,9 @@ impl<T: Address> Future for ResolverFuture<T> {
|
|||||||
req.host(),
|
req.host(),
|
||||||
addrs
|
addrs
|
||||||
);
|
);
|
||||||
if addrs.len() == 1 {
|
if addrs.is_empty() {
|
||||||
|
Err(ConnectError::NoRecords)
|
||||||
|
} else if addrs.len() == 1 {
|
||||||
req.addr = Some(either::Either::Left(addrs.pop_front().unwrap()));
|
req.addr = Some(either::Either::Left(addrs.pop_front().unwrap()));
|
||||||
Ok(Async::Ready(req))
|
Ok(Async::Ready(req))
|
||||||
} else {
|
} else {
|
||||||
|
@@ -26,6 +26,7 @@ fn port(scheme: Option<&str>) -> Option<u16> {
|
|||||||
"wss" => Some(443),
|
"wss" => Some(443),
|
||||||
"amqp" => Some(5672),
|
"amqp" => Some(5672),
|
||||||
"amqps" => Some(5671),
|
"amqps" => Some(5671),
|
||||||
|
"sb" => Some(5671),
|
||||||
"mqtt" => Some(1883),
|
"mqtt" => Some(1883),
|
||||||
"mqtts" => Some(8883),
|
"mqtts" => Some(8883),
|
||||||
_ => None,
|
_ => None,
|
||||||
|
@@ -43,13 +43,22 @@ fn test_static_str() {
|
|||||||
))
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut conn = srv
|
let mut conn = srv
|
||||||
.block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver))))
|
.block_on(lazy(|| {
|
||||||
|
Ok::<_, ()>(actix_connect::new_connector(resolver.clone()))
|
||||||
|
}))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let con = srv
|
let con = srv
|
||||||
.block_on(conn.call(Connect::with("10", srv.addr())))
|
.block_on(conn.call(Connect::with("10", srv.addr())))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||||
|
|
||||||
|
let connect = Connect::new(srv.host().to_owned());
|
||||||
|
let mut conn = srv
|
||||||
|
.block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver))))
|
||||||
|
.unwrap();
|
||||||
|
let con = srv.block_on(conn.call(connect));
|
||||||
|
assert!(con.is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@@ -86,8 +86,7 @@ impl Builder {
|
|||||||
let (stop_tx, stop) = channel();
|
let (stop_tx, stop) = channel();
|
||||||
let (sys_sender, sys_receiver) = unbounded();
|
let (sys_sender, sys_receiver) = unbounded();
|
||||||
|
|
||||||
let arbiter = Arbiter::new_system();
|
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
|
||||||
let system = System::construct(sys_sender, arbiter.clone(), self.stop_on_panic);
|
|
||||||
|
|
||||||
// system arbiter
|
// system arbiter
|
||||||
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
||||||
|
@@ -1,5 +1,10 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
* Allow to send messages to `FramedTransport` via mpsc channel.
|
||||||
|
|
||||||
|
|
||||||
## [0.3.4] - 2019-03-12
|
## [0.3.4] - 2019-03-12
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
@@ -7,6 +7,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
|||||||
use actix_service::{IntoNewService, IntoService, NewService, Service};
|
use actix_service::{IntoNewService, IntoService, NewService, Service};
|
||||||
use futures::future::{ok, FutureResult};
|
use futures::future::{ok, FutureResult};
|
||||||
use futures::task::AtomicTask;
|
use futures::task::AtomicTask;
|
||||||
|
use futures::unsync::mpsc;
|
||||||
use futures::{Async, Future, IntoFuture, Poll, Sink, Stream};
|
use futures::{Async, Future, IntoFuture, Poll, Sink, Stream};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
|
||||||
@@ -178,6 +179,11 @@ impl<E, U: Encoder + Decoder> From<E> for FramedTransportError<E, U> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum FramedMessage<T> {
|
||||||
|
Message(T),
|
||||||
|
Close,
|
||||||
|
}
|
||||||
|
|
||||||
/// FramedTransport - is a future that reads frames from Framed object
|
/// FramedTransport - is a future that reads frames from Framed object
|
||||||
/// and pass then to the service.
|
/// and pass then to the service.
|
||||||
pub struct FramedTransport<S, T, U>
|
pub struct FramedTransport<S, T, U>
|
||||||
@@ -193,6 +199,7 @@ where
|
|||||||
service: S,
|
service: S,
|
||||||
state: TransportState<S, U>,
|
state: TransportState<S, U>,
|
||||||
framed: Framed<T, U>,
|
framed: Framed<T, U>,
|
||||||
|
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
|
||||||
inner: Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>,
|
inner: Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -200,6 +207,7 @@ enum TransportState<S: Service, U: Encoder + Decoder> {
|
|||||||
Processing,
|
Processing,
|
||||||
Error(FramedTransportError<S::Error, U>),
|
Error(FramedTransportError<S::Error, U>),
|
||||||
FramedError(FramedTransportError<S::Error, U>),
|
FramedError(FramedTransportError<S::Error, U>),
|
||||||
|
FlushAndStop,
|
||||||
Stopping,
|
Stopping,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -257,10 +265,12 @@ where
|
|||||||
/// write to framed object
|
/// write to framed object
|
||||||
fn poll_write(&mut self) -> bool {
|
fn poll_write(&mut self) -> bool {
|
||||||
let inner = self.inner.get_mut();
|
let inner = self.inner.get_mut();
|
||||||
|
let mut rx_done = self.rx.is_none();
|
||||||
|
let mut buf_empty = inner.buf.is_empty();
|
||||||
loop {
|
loop {
|
||||||
while !self.framed.is_write_buf_full() {
|
while !self.framed.is_write_buf_full() {
|
||||||
if let Some(msg) = inner.buf.pop_front() {
|
if !buf_empty {
|
||||||
match msg {
|
match inner.buf.pop_front().unwrap() {
|
||||||
Ok(msg) => {
|
Ok(msg) => {
|
||||||
if let Err(err) = self.framed.force_send(msg) {
|
if let Err(err) = self.framed.force_send(msg) {
|
||||||
self.state = TransportState::FramedError(
|
self.state = TransportState::FramedError(
|
||||||
@@ -268,6 +278,7 @@ where
|
|||||||
);
|
);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
buf_empty = inner.buf.is_empty();
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.state =
|
self.state =
|
||||||
@@ -275,7 +286,33 @@ where
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
if !rx_done && self.rx.is_some() {
|
||||||
|
match self.rx.as_mut().unwrap().poll() {
|
||||||
|
Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => {
|
||||||
|
if let Err(err) = self.framed.force_send(msg) {
|
||||||
|
self.state = TransportState::FramedError(
|
||||||
|
FramedTransportError::Encoder(err),
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(Some(FramedMessage::Close))) => {
|
||||||
|
self.state = TransportState::FlushAndStop;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
let _ = self.rx.take();
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => rx_done = true,
|
||||||
|
Err(_e) => {
|
||||||
|
let _ = self.rx.take();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rx_done && buf_empty {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -313,6 +350,7 @@ where
|
|||||||
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
|
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
|
||||||
FramedTransport {
|
FramedTransport {
|
||||||
framed,
|
framed,
|
||||||
|
rx: None,
|
||||||
service: service.into_service(),
|
service: service.into_service(),
|
||||||
state: TransportState::Processing,
|
state: TransportState::Processing,
|
||||||
inner: Cell::new(FramedTransportInner {
|
inner: Cell::new(FramedTransportInner {
|
||||||
@@ -322,6 +360,15 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get Sender
|
||||||
|
pub fn set_receiver(
|
||||||
|
mut self,
|
||||||
|
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
|
||||||
|
) -> Self {
|
||||||
|
self.rx = Some(rx);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Get reference to a service wrapped by `FramedTransport` instance.
|
/// Get reference to a service wrapped by `FramedTransport` instance.
|
||||||
pub fn get_ref(&self) -> &S {
|
pub fn get_ref(&self) -> &S {
|
||||||
&self.service
|
&self.service
|
||||||
@@ -378,6 +425,20 @@ where
|
|||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
TransportState::FlushAndStop => {
|
||||||
|
if !self.framed.is_write_buf_empty() {
|
||||||
|
match self.framed.poll_complete() {
|
||||||
|
Err(err) => {
|
||||||
|
debug!("Error sending data: {:?}", err);
|
||||||
|
Ok(Async::Ready(()))
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
|
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(Async::Ready(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
TransportState::FramedError(err) => Err(err),
|
TransportState::FramedError(err) => Err(err),
|
||||||
TransportState::Stopping => Ok(Async::Ready(())),
|
TransportState::Stopping => Ok(Async::Ready(())),
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user