mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-04 18:06:23 +02:00
Compare commits
9 Commits
utils-0.1.
...
server-0.1
Author | SHA1 | Date | |
---|---|---|---|
37d28304c9 | |||
640c39fdc8 | |||
cd5435e5ee | |||
bf9bd97173 | |||
61939c7af2 | |||
e8a1664c15 | |||
d1bfae7414 | |||
5ca00dc798 | |||
fd3e77ea83 |
@ -1,5 +1,29 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.1.3] - 2018-12-21
|
||||||
|
|
||||||
|
## Fixed
|
||||||
|
|
||||||
|
* Fix max concurrent connections handling
|
||||||
|
|
||||||
|
|
||||||
|
## [0.1.2] - 2018-12-12
|
||||||
|
|
||||||
|
## Changed
|
||||||
|
|
||||||
|
* rename ServiceConfig::rt() to ServiceConfig::apply()
|
||||||
|
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Fix back-pressure for concurrent ssl handshakes
|
||||||
|
|
||||||
|
|
||||||
|
## [0.1.1] - 2018-12-11
|
||||||
|
|
||||||
|
* Fix signal handling on windows
|
||||||
|
|
||||||
|
|
||||||
## [0.1.0] - 2018-12-09
|
## [0.1.0] - 2018-12-09
|
||||||
|
|
||||||
* Move server to separate crate
|
* Move server to separate crate
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-server"
|
name = "actix-server"
|
||||||
version = "0.1.0"
|
version = "0.1.3"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix server - General purpose tcp server"
|
description = "Actix server - General purpose tcp server"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
@ -66,3 +66,6 @@ webpki-roots = { version = "0.15", optional = true }
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
env_logger = "0.5"
|
env_logger = "0.5"
|
||||||
|
actix-service = "0.1.1"
|
||||||
|
actix-codec = "0.1.0"
|
||||||
|
actix-rt = "0.1.0"
|
||||||
|
@ -64,7 +64,7 @@ impl ServerBuilder {
|
|||||||
|
|
||||||
/// Set number of workers to start.
|
/// Set number of workers to start.
|
||||||
///
|
///
|
||||||
/// By default server uses number of available logical cpu as threads
|
/// By default server uses number of available logical cpu as workers
|
||||||
/// count.
|
/// count.
|
||||||
pub fn workers(mut self, num: usize) -> Self {
|
pub fn workers(mut self, num: usize) -> Self {
|
||||||
self.threads = num;
|
self.threads = num;
|
||||||
@ -108,8 +108,8 @@ impl ServerBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run external configuration as part of the server building
|
/// Execute external configuration as part of the server building
|
||||||
/// process
|
/// process.
|
||||||
///
|
///
|
||||||
/// This function is useful for moving parts of configuration to a
|
/// This function is useful for moving parts of configuration to a
|
||||||
/// different module or even library.
|
/// different module or even library.
|
||||||
@ -117,17 +117,20 @@ impl ServerBuilder {
|
|||||||
where
|
where
|
||||||
F: Fn(&mut ServiceConfig) -> io::Result<()>,
|
F: Fn(&mut ServiceConfig) -> io::Result<()>,
|
||||||
{
|
{
|
||||||
let mut cfg = ServiceConfig::new();
|
let mut cfg = ServiceConfig::new(self.threads);
|
||||||
|
|
||||||
f(&mut cfg)?;
|
f(&mut cfg)?;
|
||||||
|
|
||||||
let mut srv = ConfiguredService::new(cfg.rt);
|
if let Some(apply) = cfg.apply {
|
||||||
for (name, lst) in cfg.services {
|
let mut srv = ConfiguredService::new(apply);
|
||||||
let token = self.token.next();
|
for (name, lst) in cfg.services {
|
||||||
srv.stream(token, name);
|
let token = self.token.next();
|
||||||
self.sockets.push((token, lst));
|
srv.stream(token, name);
|
||||||
|
self.sockets.push((token, lst));
|
||||||
|
}
|
||||||
|
self.services.push(Box::new(srv));
|
||||||
}
|
}
|
||||||
self.services.push(Box::new(srv));
|
self.threads = cfg.threads;
|
||||||
|
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
@ -15,18 +15,28 @@ use super::services::{
|
|||||||
use super::Token;
|
use super::Token;
|
||||||
|
|
||||||
pub struct ServiceConfig {
|
pub struct ServiceConfig {
|
||||||
pub(super) services: Vec<(String, net::TcpListener)>,
|
pub(crate) services: Vec<(String, net::TcpListener)>,
|
||||||
pub(super) rt: Box<ServiceRuntimeConfiguration>,
|
pub(crate) apply: Option<Box<ServiceRuntimeConfiguration>>,
|
||||||
|
pub(crate) threads: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServiceConfig {
|
impl ServiceConfig {
|
||||||
pub(super) fn new() -> ServiceConfig {
|
pub(super) fn new(threads: usize) -> ServiceConfig {
|
||||||
ServiceConfig {
|
ServiceConfig {
|
||||||
|
threads,
|
||||||
services: Vec::new(),
|
services: Vec::new(),
|
||||||
rt: Box::new(not_configured),
|
apply: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set number of workers to start.
|
||||||
|
///
|
||||||
|
/// By default server uses number of available logical cpu as workers
|
||||||
|
/// count.
|
||||||
|
pub fn workers(&mut self, num: usize) {
|
||||||
|
self.threads = num;
|
||||||
|
}
|
||||||
|
|
||||||
/// Add new service to server
|
/// Add new service to server
|
||||||
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
|
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
|
||||||
where
|
where
|
||||||
@ -43,16 +53,20 @@ impl ServiceConfig {
|
|||||||
|
|
||||||
/// Add new service to server
|
/// Add new service to server
|
||||||
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
|
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
|
||||||
|
if self.apply.is_none() {
|
||||||
|
self.apply = Some(Box::new(not_configured));
|
||||||
|
}
|
||||||
self.services.push((name.as_ref().to_string(), lst));
|
self.services.push((name.as_ref().to_string(), lst));
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register service configuration function
|
/// Register service configuration function. This function get called
|
||||||
pub fn rt<F>(&mut self, f: F) -> io::Result<()>
|
/// during worker runtime configuration. It get executed in worker thread.
|
||||||
|
pub fn apply<F>(&mut self, f: F) -> io::Result<()>
|
||||||
where
|
where
|
||||||
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
|
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
|
||||||
{
|
{
|
||||||
self.rt = Box::new(f);
|
self.apply = Some(Box::new(f));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ use futures::task::AtomicTask;
|
|||||||
/// Counter could be cloned, total ncount is shared across all clones.
|
/// Counter could be cloned, total ncount is shared across all clones.
|
||||||
pub struct Counter(Rc<CounterInner>);
|
pub struct Counter(Rc<CounterInner>);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct CounterInner {
|
struct CounterInner {
|
||||||
count: Cell<usize>,
|
count: Cell<usize>,
|
||||||
capacity: usize,
|
capacity: usize,
|
||||||
@ -40,6 +41,7 @@ impl Counter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct CounterGuard(Rc<CounterInner>);
|
pub struct CounterGuard(Rc<CounterInner>);
|
||||||
|
|
||||||
impl CounterGuard {
|
impl CounterGuard {
|
||||||
@ -57,11 +59,7 @@ impl Drop for CounterGuard {
|
|||||||
|
|
||||||
impl CounterInner {
|
impl CounterInner {
|
||||||
fn inc(&self) {
|
fn inc(&self) {
|
||||||
let num = self.count.get() + 1;
|
self.count.set(self.count.get() + 1);
|
||||||
self.count.set(num);
|
|
||||||
if num == self.capacity {
|
|
||||||
self.task.register();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dec(&self) {
|
fn dec(&self) {
|
||||||
@ -73,6 +71,10 @@ impl CounterInner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn available(&self) -> bool {
|
fn available(&self) -> bool {
|
||||||
self.count.get() < self.capacity
|
let avail = self.count.get() < self.capacity;
|
||||||
|
if !avail {
|
||||||
|
self.task.register();
|
||||||
|
}
|
||||||
|
avail
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -83,9 +83,9 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
if let Ok(stream) = stream {
|
if let Ok(stream) = stream {
|
||||||
spawn(self.service.call(stream).map_err(|_| ()).map(move |val| {
|
spawn(self.service.call(stream).then(move |res| {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
val
|
res.map_err(|_| ())
|
||||||
}));
|
}));
|
||||||
ok(())
|
ok(())
|
||||||
} else {
|
} else {
|
||||||
@ -122,9 +122,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
|
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
|
||||||
spawn(self.service.call(req).map_err(|_| ()).map(move |val| {
|
spawn(self.service.call(req).then(move |res| {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
val
|
res.map_err(|_| ())
|
||||||
}));
|
}));
|
||||||
ok(())
|
ok(())
|
||||||
}
|
}
|
||||||
|
@ -34,10 +34,12 @@ impl Signals {
|
|||||||
let fut = {
|
let fut = {
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(unix))]
|
||||||
{
|
{
|
||||||
tokio_signal::ctrl_c().and_then(move |stream| Signals {
|
tokio_signal::ctrl_c()
|
||||||
srv,
|
.map_err(|_| ())
|
||||||
stream: Box::new(stream.map(|_| Signal::Int)),
|
.and_then(move |stream| Signals {
|
||||||
})
|
srv,
|
||||||
|
stream: Box::new(stream.map(|_| Signal::Int)),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
|
@ -398,7 +398,7 @@ impl Future for Worker {
|
|||||||
let guard = self.conns.get();
|
let guard = self.conns.get();
|
||||||
let _ = self.services[msg.token.0]
|
let _ = self.services[msg.token.0]
|
||||||
.as_mut()
|
.as_mut()
|
||||||
.expect("actix net bug")
|
.expect("actix-server bug")
|
||||||
.1
|
.1
|
||||||
.call((Some(guard), ServerMessage::Connect(msg.io)));
|
.call((Some(guard), ServerMessage::Connect(msg.io)));
|
||||||
continue;
|
continue;
|
||||||
|
@ -1,5 +1,19 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.1.3] - 2018-12-12
|
||||||
|
|
||||||
|
## Changed
|
||||||
|
|
||||||
|
* Split service combinators to separate trait
|
||||||
|
|
||||||
|
|
||||||
|
## [0.1.2] - 2018-12-12
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Release future early for `.and_then()` and `.then()` combinators
|
||||||
|
|
||||||
|
|
||||||
## [0.1.1] - 2018-12-09
|
## [0.1.1] - 2018-12-09
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-service"
|
name = "actix-service"
|
||||||
version = "0.1.1"
|
version = "0.1.3"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix Service"
|
description = "Actix Service"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@ -61,7 +61,7 @@ where
|
|||||||
{
|
{
|
||||||
b: Cell<B>,
|
b: Cell<B>,
|
||||||
fut_b: Option<B::Future>,
|
fut_b: Option<B::Future>,
|
||||||
fut_a: A::Future,
|
fut_a: Option<A::Future>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, B, Request> AndThenFuture<A, B, Request>
|
impl<A, B, Request> AndThenFuture<A, B, Request>
|
||||||
@ -69,10 +69,10 @@ where
|
|||||||
A: Service<Request>,
|
A: Service<Request>,
|
||||||
B: Service<A::Response, Error = A::Error>,
|
B: Service<A::Response, Error = A::Error>,
|
||||||
{
|
{
|
||||||
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
|
fn new(a: A::Future, b: Cell<B>) -> Self {
|
||||||
AndThenFuture {
|
AndThenFuture {
|
||||||
b,
|
b,
|
||||||
fut_a,
|
fut_a: Some(a),
|
||||||
fut_b: None,
|
fut_b: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -91,8 +91,9 @@ where
|
|||||||
return fut.poll();
|
return fut.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.fut_a.poll() {
|
match self.fut_a.as_mut().expect("Bug in actix-service").poll() {
|
||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
|
let _ = self.fut_a.take();
|
||||||
self.fut_b = Some(self.b.get_mut().call(resp));
|
self.fut_b = Some(self.b.get_mut().call(resp));
|
||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
@ -218,7 +219,7 @@ mod tests {
|
|||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{NewService, Service};
|
use crate::{NewService, Service, ServiceExt};
|
||||||
|
|
||||||
struct Srv1(Rc<Cell<usize>>);
|
struct Srv1(Rc<Cell<usize>>);
|
||||||
impl Service<&'static str> for Srv1 {
|
impl Service<&'static str> for Srv1 {
|
||||||
|
@ -171,7 +171,7 @@ mod tests {
|
|||||||
use futures::future::{ok, FutureResult};
|
use futures::future::{ok, FutureResult};
|
||||||
use futures::{Async, Future, Poll};
|
use futures::{Async, Future, Poll};
|
||||||
|
|
||||||
use crate::{IntoNewService, IntoService, NewService, Service};
|
use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct Srv;
|
struct Srv;
|
||||||
|
@ -159,7 +159,7 @@ mod tests {
|
|||||||
use futures::future::{err, FutureResult};
|
use futures::future::{err, FutureResult};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{IntoNewService, NewService, Service};
|
use crate::{IntoNewService, NewService, Service, ServiceExt};
|
||||||
|
|
||||||
struct Srv;
|
struct Srv;
|
||||||
impl Service<()> for Srv {
|
impl Service<()> for Srv {
|
||||||
|
@ -50,7 +50,11 @@ pub trait Service<Request> {
|
|||||||
/// Calling `call` without calling `poll_ready` is permitted. The
|
/// Calling `call` without calling `poll_ready` is permitted. The
|
||||||
/// implementation must be resilient to this fact.
|
/// implementation must be resilient to this fact.
|
||||||
fn call(&mut self, req: Request) -> Self::Future;
|
fn call(&mut self, req: Request) -> Self::Future;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An extension trait for `Service`s that provides a variety of convenient
|
||||||
|
/// adapters
|
||||||
|
pub trait ServiceExt<Request>: Service<Request> {
|
||||||
/// Apply function to specified service and use it as a next service in
|
/// Apply function to specified service and use it as a next service in
|
||||||
/// chain.
|
/// chain.
|
||||||
fn apply<T, I, F, Out, Req>(
|
fn apply<T, I, F, Out, Req>(
|
||||||
@ -146,6 +150,8 @@ pub trait Service<Request> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}
|
||||||
|
|
||||||
/// Creates new `Service` values.
|
/// Creates new `Service` values.
|
||||||
///
|
///
|
||||||
/// Acts as a service factory. This is useful for cases where new `Service`
|
/// Acts as a service factory. This is useful for cases where new `Service`
|
||||||
|
@ -189,7 +189,7 @@ mod tests {
|
|||||||
use futures::future::{ok, FutureResult};
|
use futures::future::{ok, FutureResult};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{IntoNewService, Service};
|
use crate::{IntoNewService, Service, ServiceExt};
|
||||||
|
|
||||||
struct Srv;
|
struct Srv;
|
||||||
impl Service<()> for Srv {
|
impl Service<()> for Srv {
|
||||||
|
@ -190,7 +190,7 @@ mod tests {
|
|||||||
use futures::future::{err, FutureResult};
|
use futures::future::{err, FutureResult};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{IntoNewService, NewService, Service};
|
use crate::{IntoNewService, NewService, Service, ServiceExt};
|
||||||
|
|
||||||
struct Srv;
|
struct Srv;
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ where
|
|||||||
{
|
{
|
||||||
b: Cell<B>,
|
b: Cell<B>,
|
||||||
fut_b: Option<B::Future>,
|
fut_b: Option<B::Future>,
|
||||||
fut_a: A::Future,
|
fut_a: Option<A::Future>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, B, Request> ThenFuture<A, B, Request>
|
impl<A, B, Request> ThenFuture<A, B, Request>
|
||||||
@ -69,10 +69,10 @@ where
|
|||||||
A: Service<Request>,
|
A: Service<Request>,
|
||||||
B: Service<Result<A::Response, A::Error>>,
|
B: Service<Result<A::Response, A::Error>>,
|
||||||
{
|
{
|
||||||
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
|
fn new(a: A::Future, b: Cell<B>) -> Self {
|
||||||
ThenFuture {
|
ThenFuture {
|
||||||
b,
|
b,
|
||||||
fut_a,
|
fut_a: Some(a),
|
||||||
fut_b: None,
|
fut_b: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -91,12 +91,14 @@ where
|
|||||||
return fut.poll();
|
return fut.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.fut_a.poll() {
|
match self.fut_a.as_mut().expect("bug in actix-service").poll() {
|
||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
|
let _ = self.fut_a.take();
|
||||||
self.fut_b = Some(self.b.get_mut().call(Ok(resp)));
|
self.fut_b = Some(self.b.get_mut().call(Ok(resp)));
|
||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
let _ = self.fut_a.take();
|
||||||
self.fut_b = Some(self.b.get_mut().call(Err(err)));
|
self.fut_b = Some(self.b.get_mut().call(Err(err)));
|
||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
@ -221,11 +223,11 @@ where
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use futures::future::{err, ok, FutureResult};
|
use futures::future::{err, ok, FutureResult};
|
||||||
use futures::{Async, Poll};
|
use futures::{Async, Future, Poll};
|
||||||
use std::cell::Cell;
|
use std::cell::Cell;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
use super::*;
|
use crate::{IntoNewService, NewService, Service, ServiceExt};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct Srv1(Rc<Cell<usize>>);
|
struct Srv1(Rc<Cell<usize>>);
|
||||||
|
@ -21,8 +21,7 @@ path = "src/lib.rs"
|
|||||||
actix-service = "0.1.1"
|
actix-service = "0.1.1"
|
||||||
actix-codec = "0.1.0"
|
actix-codec = "0.1.0"
|
||||||
actix-rt = "0.1.0"
|
actix-rt = "0.1.0"
|
||||||
|
|
||||||
# io
|
|
||||||
bytes = "0.4"
|
bytes = "0.4"
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
tokio-timer = "0.2.8"
|
tokio-timer = "0.2.8"
|
||||||
|
log = "0.4"
|
@ -7,7 +7,8 @@ use actix_rt::Arbiter;
|
|||||||
use actix_service::{IntoNewService, IntoService, NewService, Service};
|
use actix_service::{IntoNewService, IntoService, NewService, Service};
|
||||||
use futures::future::{ok, FutureResult};
|
use futures::future::{ok, FutureResult};
|
||||||
use futures::unsync::mpsc;
|
use futures::unsync::mpsc;
|
||||||
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
|
use futures::{Async, Future, Poll, Sink, Stream};
|
||||||
|
use log::debug;
|
||||||
|
|
||||||
type Request<U> = <U as Decoder>::Item;
|
type Request<U> = <U as Decoder>::Item;
|
||||||
type Response<U> = <U as Encoder>::Item;
|
type Response<U> = <U as Encoder>::Item;
|
||||||
@ -179,10 +180,8 @@ where
|
|||||||
state: TransportState<S, U>,
|
state: TransportState<S, U>,
|
||||||
framed: Framed<T, U>,
|
framed: Framed<T, U>,
|
||||||
request: Option<Request<U>>,
|
request: Option<Request<U>>,
|
||||||
response: Option<Response<U>>,
|
|
||||||
write_rx: mpsc::Receiver<Result<Response<U>, S::Error>>,
|
write_rx: mpsc::Receiver<Result<Response<U>, S::Error>>,
|
||||||
write_tx: mpsc::Sender<Result<Response<U>, S::Error>>,
|
write_tx: mpsc::Sender<Result<Response<U>, S::Error>>,
|
||||||
flushed: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> {
|
enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> {
|
||||||
@ -210,8 +209,6 @@ where
|
|||||||
service: service.into_service(),
|
service: service.into_service(),
|
||||||
state: TransportState::Processing,
|
state: TransportState::Processing,
|
||||||
request: None,
|
request: None,
|
||||||
response: None,
|
|
||||||
flushed: true,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,7 +244,7 @@ where
|
|||||||
S::Future: 'static,
|
S::Future: 'static,
|
||||||
S::Error: 'static,
|
S::Error: 'static,
|
||||||
<U as Encoder>::Item: 'static,
|
<U as Encoder>::Item: 'static,
|
||||||
<U as Encoder>::Error: 'static,
|
<U as Encoder>::Error: std::fmt::Debug + 'static,
|
||||||
{
|
{
|
||||||
fn poll_service(&mut self) -> bool {
|
fn poll_service(&mut self) -> bool {
|
||||||
match self.service.poll_ready() {
|
match self.service.poll_ready() {
|
||||||
@ -307,59 +304,42 @@ where
|
|||||||
|
|
||||||
/// write to sink
|
/// write to sink
|
||||||
fn poll_response(&mut self) -> bool {
|
fn poll_response(&mut self) -> bool {
|
||||||
let mut item = self.response.take();
|
|
||||||
loop {
|
loop {
|
||||||
item = if let Some(msg) = item {
|
while !self.framed.is_write_buf_full() {
|
||||||
self.flushed = false;
|
match self.write_rx.poll() {
|
||||||
match self.framed.start_send(msg) {
|
Ok(Async::Ready(Some(msg))) => match msg {
|
||||||
Ok(AsyncSink::Ready) => None,
|
Ok(msg) => {
|
||||||
Ok(AsyncSink::NotReady(item)) => Some(item),
|
if let Err(err) = self.framed.force_send(msg) {
|
||||||
Err(err) => {
|
self.state = TransportState::EncoderError(
|
||||||
self.state =
|
FramedTransportError::Encoder(err),
|
||||||
TransportState::EncoderError(FramedTransportError::Encoder(err));
|
);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
Err(err) => {
|
||||||
None
|
self.state =
|
||||||
};
|
TransportState::Error(FramedTransportError::Service(err));
|
||||||
|
return true;
|
||||||
// flush sink
|
}
|
||||||
if !self.flushed {
|
},
|
||||||
match self.framed.poll_complete() {
|
|
||||||
Ok(Async::Ready(_)) => {
|
|
||||||
self.flushed = true;
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => break,
|
Ok(Async::NotReady) => break,
|
||||||
Err(err) => {
|
Err(_) => panic!("Bug in actix-net code"),
|
||||||
self.state =
|
Ok(Async::Ready(None)) => panic!("Bug in actix-net code"),
|
||||||
TransportState::EncoderError(FramedTransportError::Encoder(err));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check channel
|
if !self.framed.is_write_buf_empty() {
|
||||||
if self.flushed {
|
match self.framed.poll_complete() {
|
||||||
if item.is_none() {
|
Ok(Async::NotReady) => break,
|
||||||
match self.write_rx.poll() {
|
Err(err) => {
|
||||||
Ok(Async::Ready(Some(msg))) => match msg {
|
debug!("Error sending data: {:?}", err);
|
||||||
Ok(msg) => item = Some(msg),
|
self.state =
|
||||||
Err(err) => {
|
TransportState::EncoderError(FramedTransportError::Encoder(err));
|
||||||
self.state =
|
return true;
|
||||||
TransportState::Error(FramedTransportError::Service(err));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Ok(Async::NotReady) => break,
|
|
||||||
Err(_) => panic!("Bug in gw code"),
|
|
||||||
Ok(Async::Ready(None)) => panic!("Bug in gw code"),
|
|
||||||
}
|
}
|
||||||
} else {
|
Ok(Async::Ready(_)) => (),
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
self.response = item;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -376,7 +356,7 @@ where
|
|||||||
S::Future: 'static,
|
S::Future: 'static,
|
||||||
S::Error: 'static,
|
S::Error: 'static,
|
||||||
<U as Encoder>::Item: 'static,
|
<U as Encoder>::Item: 'static,
|
||||||
<U as Encoder>::Error: 'static,
|
<U as Encoder>::Error: std::fmt::Debug + 'static,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Item = ();
|
||||||
type Error = FramedTransportError<S::Error, U>;
|
type Error = FramedTransportError<S::Error, U>;
|
||||||
@ -391,7 +371,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
TransportState::Error(err) => {
|
TransportState::Error(err) => {
|
||||||
if self.poll_response() || self.flushed {
|
if self.poll_response() || !self.framed.is_write_buf_empty() {
|
||||||
Err(err)
|
Err(err)
|
||||||
} else {
|
} else {
|
||||||
self.state = TransportState::Error(err);
|
self.state = TransportState::Error(err);
|
||||||
|
Reference in New Issue
Block a user