1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-22 00:56:15 +02:00

Compare commits

...

4 Commits

Author SHA1 Message Date
Nikolay Kim
37d28304c9 Fix max concurrent connections handling 2018-12-21 10:38:08 -08:00
Nikolay Kim
640c39fdc8 better usage for Framed type 2018-12-16 16:26:24 -08:00
Nikolay Kim
cd5435e5ee fix service tests 2018-12-12 18:56:39 -08:00
Nikolay Kim
bf9bd97173 split ServiceExt trait 2018-12-12 18:32:19 -08:00
14 changed files with 67 additions and 68 deletions

View File

@@ -1,5 +1,12 @@
# Changes
## [0.1.3] - 2018-12-21
## Fixed
* Fix max concurrent connections handling
## [0.1.2] - 2018-12-12
## Changed

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "0.1.2"
version = "0.1.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -83,9 +83,9 @@ where
});
if let Ok(stream) = stream {
spawn(self.service.call(stream).map_err(|_| ()).map(move |val| {
spawn(self.service.call(stream).then(move |res| {
drop(guard);
val
res.map_err(|_| ())
}));
ok(())
} else {
@@ -122,9 +122,9 @@ where
}
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
spawn(self.service.call(req).map_err(|_| ()).map(move |val| {
spawn(self.service.call(req).then(move |res| {
drop(guard);
val
res.map_err(|_| ())
}));
ok(())
}

View File

@@ -1,5 +1,12 @@
# Changes
## [0.1.3] - 2018-12-12
## Changed
* Split service combinators to separate trait
## [0.1.2] - 2018-12-12
### Fixed

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-service"
version = "0.1.2"
version = "0.1.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -219,7 +219,7 @@ mod tests {
use std::rc::Rc;
use super::*;
use crate::{NewService, Service};
use crate::{NewService, Service, ServiceExt};
struct Srv1(Rc<Cell<usize>>);
impl Service<&'static str> for Srv1 {

View File

@@ -171,7 +171,7 @@ mod tests {
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use crate::{IntoNewService, IntoService, NewService, Service};
use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt};
#[derive(Clone)]
struct Srv;

View File

@@ -159,7 +159,7 @@ mod tests {
use futures::future::{err, FutureResult};
use super::*;
use crate::{IntoNewService, NewService, Service};
use crate::{IntoNewService, NewService, Service, ServiceExt};
struct Srv;
impl Service<()> for Srv {

View File

@@ -50,7 +50,11 @@ pub trait Service<Request> {
/// Calling `call` without calling `poll_ready` is permitted. The
/// implementation must be resilient to this fact.
fn call(&mut self, req: Request) -> Self::Future;
}
/// An extension trait for `Service`s that provides a variety of convenient
/// adapters
pub trait ServiceExt<Request>: Service<Request> {
/// Apply function to specified service and use it as a next service in
/// chain.
fn apply<T, I, F, Out, Req>(
@@ -146,6 +150,8 @@ pub trait Service<Request> {
}
}
impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}
/// Creates new `Service` values.
///
/// Acts as a service factory. This is useful for cases where new `Service`

View File

@@ -189,7 +189,7 @@ mod tests {
use futures::future::{ok, FutureResult};
use super::*;
use crate::{IntoNewService, Service};
use crate::{IntoNewService, Service, ServiceExt};
struct Srv;
impl Service<()> for Srv {

View File

@@ -190,7 +190,7 @@ mod tests {
use futures::future::{err, FutureResult};
use super::*;
use crate::{IntoNewService, NewService, Service};
use crate::{IntoNewService, NewService, Service, ServiceExt};
struct Srv;

View File

@@ -223,11 +223,11 @@ where
#[cfg(test)]
mod tests {
use futures::future::{err, ok, FutureResult};
use futures::{Async, Poll};
use futures::{Async, Future, Poll};
use std::cell::Cell;
use std::rc::Rc;
use super::*;
use crate::{IntoNewService, NewService, Service, ServiceExt};
#[derive(Clone)]
struct Srv1(Rc<Cell<usize>>);

View File

@@ -21,8 +21,7 @@ path = "src/lib.rs"
actix-service = "0.1.1"
actix-codec = "0.1.0"
actix-rt = "0.1.0"
# io
bytes = "0.4"
futures = "0.1"
tokio-timer = "0.2.8"
log = "0.4"

View File

@@ -7,7 +7,8 @@ use actix_rt::Arbiter;
use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, FutureResult};
use futures::unsync::mpsc;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use futures::{Async, Future, Poll, Sink, Stream};
use log::debug;
type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item;
@@ -179,10 +180,8 @@ where
state: TransportState<S, U>,
framed: Framed<T, U>,
request: Option<Request<U>>,
response: Option<Response<U>>,
write_rx: mpsc::Receiver<Result<Response<U>, S::Error>>,
write_tx: mpsc::Sender<Result<Response<U>, S::Error>>,
flushed: bool,
}
enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> {
@@ -210,8 +209,6 @@ where
service: service.into_service(),
state: TransportState::Processing,
request: None,
response: None,
flushed: true,
}
}
@@ -247,7 +244,7 @@ where
S::Future: 'static,
S::Error: 'static,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: 'static,
<U as Encoder>::Error: std::fmt::Debug + 'static,
{
fn poll_service(&mut self) -> bool {
match self.service.poll_ready() {
@@ -307,59 +304,42 @@ where
/// write to sink
fn poll_response(&mut self) -> bool {
let mut item = self.response.take();
loop {
item = if let Some(msg) = item {
self.flushed = false;
match self.framed.start_send(msg) {
Ok(AsyncSink::Ready) => None,
Ok(AsyncSink::NotReady(item)) => Some(item),
Err(err) => {
self.state =
TransportState::EncoderError(FramedTransportError::Encoder(err));
return true;
}
}
} else {
None
};
// flush sink
if !self.flushed {
match self.framed.poll_complete() {
Ok(Async::Ready(_)) => {
self.flushed = true;
}
while !self.framed.is_write_buf_full() {
match self.write_rx.poll() {
Ok(Async::Ready(Some(msg))) => match msg {
Ok(msg) => {
if let Err(err) = self.framed.force_send(msg) {
self.state = TransportState::EncoderError(
FramedTransportError::Encoder(err),
);
return true;
}
}
Err(err) => {
self.state =
TransportState::Error(FramedTransportError::Service(err));
return true;
}
},
Ok(Async::NotReady) => break,
Err(err) => {
self.state =
TransportState::EncoderError(FramedTransportError::Encoder(err));
return true;
}
Err(_) => panic!("Bug in actix-net code"),
Ok(Async::Ready(None)) => panic!("Bug in actix-net code"),
}
}
// check channel
if self.flushed {
if item.is_none() {
match self.write_rx.poll() {
Ok(Async::Ready(Some(msg))) => match msg {
Ok(msg) => item = Some(msg),
Err(err) => {
self.state =
TransportState::Error(FramedTransportError::Service(err));
return true;
}
},
Ok(Async::NotReady) => break,
Err(_) => panic!("Bug in gw code"),
Ok(Async::Ready(None)) => panic!("Bug in gw code"),
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Ok(Async::NotReady) => break,
Err(err) => {
debug!("Error sending data: {:?}", err);
self.state =
TransportState::EncoderError(FramedTransportError::Encoder(err));
return true;
}
} else {
continue;
Ok(Async::Ready(_)) => (),
}
} else {
self.response = item;
break;
}
}
@@ -376,7 +356,7 @@ where
S::Future: 'static,
S::Error: 'static,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: 'static,
<U as Encoder>::Error: std::fmt::Debug + 'static,
{
type Item = ();
type Error = FramedTransportError<S::Error, U>;
@@ -391,7 +371,7 @@ where
}
}
TransportState::Error(err) => {
if self.poll_response() || self.flushed {
if self.poll_response() || !self.framed.is_write_buf_empty() {
Err(err)
} else {
self.state = TransportState::Error(err);