1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

* fix force_close

* shutdown io before exit

* fix response creation with body from pool
This commit is contained in:
Nikolay Kim 2018-01-03 18:21:34 -08:00
parent 8348c830e2
commit e439d0546b
9 changed files with 48 additions and 46 deletions

View File

@ -48,7 +48,7 @@ impl<S: 'static> PipelineHandler<S> for Inner<S> {
if path.is_empty() { if path.is_empty() {
req.match_info_mut().add("tail", ""); req.match_info_mut().add("tail", "");
} else { } else {
req.match_info_mut().add("tail", path.trim_left_matches('/')); req.match_info_mut().add("tail", path.split_at(1).1);
} }
return handler.handle(req) return handler.handle(req)
} }

View File

@ -79,7 +79,9 @@ impl<T, H> HttpChannel<T, H>
} }
} }
/*impl<T: 'static, A: 'static, H: 'static> Drop for HttpChannel<T, A, H> { /*impl<T, H> Drop for HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
fn drop(&mut self) { fn drop(&mut self) {
println!("Drop http channel"); println!("Drop http channel");
} }

View File

@ -14,7 +14,6 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel
use body::{Body, Binary}; use body::{Body, Binary};
use error::{Error, Result}; use error::{Error, Result};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse;
pub trait ActorHttpContext: 'static { pub trait ActorHttpContext: 'static {
@ -124,16 +123,6 @@ impl<A, S: 'static> HttpContext<A, S> where A: Actor<Context=Self> {
self.act = Some(actor); self.act = Some(actor);
self self
} }
pub fn with_actor(mut self, actor: A, mut resp: HttpResponse) -> Result<HttpResponse> {
if self.act.is_some() {
panic!("Actor is set already");
}
self.act = Some(actor);
resp.replace_body(Body::Actor(Box::new(self)));
Ok(resp)
}
} }
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> { impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {

View File

@ -97,11 +97,11 @@ impl<T, H> Http1<T, H>
(self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze()) (self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze())
} }
fn poll_completed(&mut self) -> Result<bool, ()> { fn poll_completed(&mut self, shutdown: bool) -> Result<bool, ()> {
// check stream state // check stream state
match self.stream.poll_completed() { match self.stream.poll_completed(shutdown) {
Ok(Async::Ready(_)) => Ok(false), Ok(Async::Ready(_)) => Ok(true),
Ok(Async::NotReady) => Ok(true), Ok(Async::NotReady) => Ok(false),
Err(err) => { Err(err) => {
debug!("Error sending data: {}", err); debug!("Error sending data: {}", err);
Err(()) Err(())
@ -136,7 +136,7 @@ impl<T, H> Http1<T, H>
if !io && !item.flags.contains(EntryFlags::EOF) { if !io && !item.flags.contains(EntryFlags::EOF) {
if item.flags.contains(EntryFlags::ERROR) { if item.flags.contains(EntryFlags::ERROR) {
// check stream state // check stream state
if let Ok(Async::NotReady) = self.stream.poll_completed() { if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
return Err(()) return Err(())
@ -147,13 +147,11 @@ impl<T, H> Http1<T, H>
not_ready = false; not_ready = false;
// overide keep-alive state // overide keep-alive state
if self.settings.keep_alive_enabled() {
if self.stream.keepalive() { if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE); self.flags.insert(Flags::KEEPALIVE);
} else { } else {
self.flags.remove(Flags::KEEPALIVE); self.flags.remove(Flags::KEEPALIVE);
} }
}
self.stream.reset(); self.stream.reset();
item.flags.insert(EntryFlags::EOF); item.flags.insert(EntryFlags::EOF);
@ -172,7 +170,7 @@ impl<T, H> Http1<T, H>
item.flags.insert(EntryFlags::ERROR); item.flags.insert(EntryFlags::ERROR);
// check stream state, we still can have valid data in buffer // check stream state, we still can have valid data in buffer
if let Ok(Async::NotReady) = self.stream.poll_completed() { if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
return Err(()) return Err(())
@ -207,11 +205,14 @@ impl<T, H> Http1<T, H>
// no keep-alive // no keep-alive
if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() { if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() {
let h2 = self.flags.contains(Flags::H2);
// check stream state // check stream state
if self.poll_completed()? { if !self.poll_completed(!h2)? {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
if self.flags.contains(Flags::H2) {
if h2 {
return Ok(Async::Ready(Http1Result::Switch)) return Ok(Async::Ready(Http1Result::Switch))
} else { } else {
return Ok(Async::Ready(Http1Result::Done)) return Ok(Async::Ready(Http1Result::Done))
@ -284,7 +285,7 @@ impl<T, H> Http1<T, H>
} }
} }
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
// start keep-alive timer, this is also slow request timeout // start keep-alive timer, this also is slow request timeout
if self.tasks.is_empty() { if self.tasks.is_empty() {
if self.settings.keep_alive_enabled() { if self.settings.keep_alive_enabled() {
let keep_alive = self.settings.keep_alive(); let keep_alive = self.settings.keep_alive();
@ -300,17 +301,20 @@ impl<T, H> Http1<T, H>
} }
} else { } else {
// check stream state // check stream state
if self.poll_completed()? { if !self.poll_completed(true)? {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
// keep-alive disable, drop connection // keep-alive disable, drop connection
return Ok(Async::Ready(Http1Result::Done)) return Ok(Async::Ready(Http1Result::Done))
} }
} else { } else if !self.poll_completed(false)? ||
// check stream state self.flags.contains(Flags::KEEPALIVE)
self.poll_completed()?; {
// keep-alive unset, rely on operating system // check stream state or
// if keep-alive unset, rely on operating system
return Ok(Async::NotReady) return Ok(Async::NotReady)
} else {
return Ok(Async::Ready(Http1Result::Done))
} }
} }
break break
@ -320,12 +324,13 @@ impl<T, H> Http1<T, H>
// check for parse error // check for parse error
if self.tasks.is_empty() { if self.tasks.is_empty() {
let h2 = self.flags.contains(Flags::H2);
// check stream state // check stream state
if self.poll_completed()? { if !self.poll_completed(!h2)? {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
if h2 {
if self.flags.contains(Flags::H2) {
return Ok(Async::Ready(Http1Result::Switch)) return Ok(Async::Ready(Http1Result::Switch))
} }
if self.flags.contains(Flags::ERROR) || self.keepalive_timer.is_none() { if self.flags.contains(Flags::ERROR) || self.keepalive_timer.is_none() {
@ -334,7 +339,7 @@ impl<T, H> Http1<T, H>
} }
if not_ready { if not_ready {
self.poll_completed()?; self.poll_completed(false)?;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }

View File

@ -33,7 +33,7 @@ pub trait Writer {
fn write_eof(&mut self) -> Result<WriterState, io::Error>; fn write_eof(&mut self) -> Result<WriterState, io::Error>;
fn poll_completed(&mut self) -> Poll<(), io::Error>; fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>;
} }
bitflags! { bitflags! {
@ -94,7 +94,7 @@ impl<T: AsyncWrite> H1Writer<T> {
while !buffer.is_empty() { while !buffer.is_empty() {
match self.stream.write(buffer.as_ref()) { match self.stream.write(buffer.as_ref()) {
Ok(n) => { Ok(n) => {
buffer.split_to(n); let _ = buffer.split_to(n);
}, },
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if buffer.len() > MAX_WRITE_BUFFER_SIZE { if buffer.len() > MAX_WRITE_BUFFER_SIZE {
@ -112,7 +112,6 @@ impl<T: AsyncWrite> H1Writer<T> {
impl<T: AsyncWrite> Writer for H1Writer<T> { impl<T: AsyncWrite> Writer for H1Writer<T> {
#[cfg_attr(feature = "cargo-clippy", allow(cast_lossless))]
fn written(&self) -> u64 { fn written(&self) -> u64 {
self.written self.written
} }
@ -218,7 +217,6 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.encoder.write_eof()?; self.encoder.write_eof()?;
if !self.encoder.is_eof() { if !self.encoder.is_eof() {
// debug!("last payload item, but it is not EOF ");
Err(io::Error::new(io::ErrorKind::Other, Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached")) "Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { } else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {
@ -228,9 +226,15 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
} }
} }
fn poll_completed(&mut self) -> Poll<(), io::Error> { fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
match self.write_to_stream() { match self.write_to_stream() {
Ok(WriterState::Done) => Ok(Async::Ready(())), Ok(WriterState::Done) => {
if shutdown {
self.stream.shutdown()
} else {
Ok(Async::Ready(()))
}
},
Ok(WriterState::Pause) => Ok(Async::NotReady), Ok(WriterState::Pause) => Ok(Async::NotReady),
Err(err) => Err(err) Err(err) => Err(err)
} }

View File

@ -213,7 +213,7 @@ impl Writer for H2Writer {
} }
} }
fn poll_completed(&mut self) -> Poll<(), io::Error> { fn poll_completed(&mut self, _shutdown: bool) -> Poll<(), io::Error> {
match self.write_to_stream() { match self.write_to_stream() {
Ok(WriterState::Done) => Ok(Async::Ready(())), Ok(WriterState::Done) => Ok(Async::Ready(())),
Ok(WriterState::Pause) => Ok(Async::NotReady), Ok(WriterState::Pause) => Ok(Async::NotReady),

View File

@ -674,7 +674,7 @@ impl Pool {
POOL.with(|pool| { POOL.with(|pool| {
if let Some(mut resp) = pool.borrow_mut().0.pop_front() { if let Some(mut resp) = pool.borrow_mut().0.pop_front() {
resp.status = status; resp.status = status;
resp.body = Body::Empty; resp.body = body;
resp resp
} else { } else {
Box::new(InnerHttpResponse::new(status, body)) Box::new(InnerHttpResponse::new(status, body))

View File

@ -658,7 +658,7 @@ impl<S, H> ProcessResponse<S, H> {
// flush io but only if we need to // flush io but only if we need to
if self.running == RunningState::Paused || self.drain.is_some() { if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed() { match io.poll_completed(false) {
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
self.running.resume(); self.running.resume();

View File

@ -189,6 +189,8 @@ impl Pattern {
} }
/// Extract pattern parameters from the text /// Extract pattern parameters from the text
// This method unsafe internally, assumption that Pattern instance lives
// longer than `req`
pub fn update_match_info<S>(&self, req: &mut HttpRequest<S>, prefix: usize) { pub fn update_match_info<S>(&self, req: &mut HttpRequest<S>, prefix: usize) {
if !self.names.is_empty() { if !self.names.is_empty() {
let text: &str = unsafe{ mem::transmute(&req.path()[prefix..]) }; let text: &str = unsafe{ mem::transmute(&req.path()[prefix..]) };