From 9f9c75d832c3f6c0da9ea8197758eabdb61deb97 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 24 Dec 2017 11:58:09 -0800 Subject: [PATCH] simplify drain feature --- examples/basic.rs | 2 +- src/context.rs | 36 ++++++----------------- src/encoding.rs | 2 +- src/pipeline.rs | 73 ++++++----------------------------------------- 4 files changed, 18 insertions(+), 95 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index 009a01a1..42d29e8a 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -92,7 +92,7 @@ fn main() { .header("LOCATION", "/index.html") .finish() }))) - .bind("127.0.0.1:8080").unwrap() + .bind("0.0.0.0:8080").unwrap() .start(); println!("Starting http server: 127.0.0.1:8080"); diff --git a/src/context.rs b/src/context.rs index c9f77014..f0e80de1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,10 +1,8 @@ use std; -use std::rc::Rc; -use std::cell::RefCell; use std::collections::VecDeque; -use std::marker::PhantomData; -use futures::{Async, Future, Poll}; +use futures::{Async, Poll}; use futures::sync::oneshot::Sender; +use futures::unsync::oneshot; use actix::{Actor, ActorState, ActorContext, AsyncContext, Handler, Subscriber, ResponseType}; @@ -16,7 +14,7 @@ use body::{Body, Binary}; use error::Error; use httprequest::HttpRequest; use httpresponse::HttpResponse; -use pipeline::DrainFut; + pub(crate) trait IoContext: 'static { fn disconnected(&mut self); @@ -27,7 +25,7 @@ pub(crate) trait IoContext: 'static { pub(crate) enum Frame { Message(HttpResponse), Payload(Option), - Drain(Rc>), + Drain(oneshot::Sender<()>), } /// Http actor execution context @@ -161,11 +159,11 @@ impl HttpContext where A: Actor { } /// Returns drain future - pub fn drain(&mut self) -> Drain { - let fut = Rc::new(RefCell::new(DrainFut::default())); - self.stream.push_back(Frame::Drain(Rc::clone(&fut))); + pub fn drain(&mut self) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); self.modified = true; - Drain{ a: PhantomData, inner: fut } + self.stream.push_back(Frame::Drain(tx)); + rx } /// Check if connection still open @@ -305,21 +303,3 @@ impl ToEnvelope for HttpContext RemoteEnvelope::new(msg, tx).into() } } - - -pub struct Drain { - a: PhantomData, - inner: Rc> -} - -impl ActorFuture for Drain - where A: Actor -{ - type Item = (); - type Error = (); - type Actor = A; - - fn poll(&mut self, _: &mut A, _: &mut ::Context) -> Poll<(), ()> { - self.inner.borrow_mut().poll() - } -} diff --git a/src/encoding.rs b/src/encoding.rs index 291cb8e6..3254f640 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -394,7 +394,7 @@ impl PayloadEncoder { }, Body::Binary(ref mut bytes) => { if compression { - let mut buf = SharedBytes::default(); + let buf = SharedBytes::default(); let transfer = TransferEncoding::eof(buf.clone()); let mut enc = match encoding { ContentEncoding::Deflate => ContentEncoder::Deflate( diff --git a/src/pipeline.rs b/src/pipeline.rs index c04968dc..71b3e7af 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -1,10 +1,9 @@ use std::{io, mem}; use std::rc::Rc; -use std::cell::RefCell; use std::marker::PhantomData; use futures::{Async, Poll, Future, Stream}; -use futures::task::{Task as FutureTask, current as current_task}; +use futures::unsync::oneshot; use channel::HttpHandlerTask; use body::{Body, BodyStream}; @@ -76,49 +75,6 @@ enum PipelineResponse { Response(Box>), } -/// Future that resolves when all buffered data get sent -#[doc(hidden)] -#[derive(Debug)] -pub struct DrainFut { - drained: bool, - task: Option, -} - -impl Default for DrainFut { - - fn default() -> DrainFut { - DrainFut { - drained: false, - task: None, - } - } -} - -impl DrainFut { - - fn set(&mut self) { - self.drained = true; - if let Some(task) = self.task.take() { - task.notify() - } - } -} - -impl Future for DrainFut { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - if self.drained { - Ok(Async::Ready(())) - } else { - self.task = Some(current_task()); - Ok(Async::NotReady) - } - } -} - - impl Pipeline { pub fn new(req: HttpRequest, @@ -554,7 +510,7 @@ struct ProcessResponse { resp: HttpResponse, iostate: IOState, running: RunningState, - drain: DrainVec, + drain: Option>, _s: PhantomData, } @@ -587,16 +543,6 @@ enum IOState { Done, } -struct DrainVec(Vec>>); - -impl Drop for DrainVec { - fn drop(&mut self) { - for drain in &mut self.0 { - drain.borrow_mut().set() - } - } -} - impl ProcessResponse { #[inline] @@ -606,14 +552,14 @@ impl ProcessResponse { ProcessResponse{ resp: resp, iostate: IOState::Response, running: RunningState::Running, - drain: DrainVec(Vec::new()), + drain: None, _s: PhantomData}) } fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo) -> Result, PipelineState> { - if self.drain.0.is_empty() && self.running != RunningState::Paused { + if self.drain.is_none() && self.running != RunningState::Paused { // if task is paused, write buffer is probably full loop { @@ -712,7 +658,7 @@ impl ProcessResponse { } }, Frame::Drain(fut) => { - self.drain.0.push(fut); + self.drain = Some(fut); break } } @@ -748,7 +694,7 @@ impl ProcessResponse { } // flush io but only if we need to - if self.running == RunningState::Paused || !self.drain.0.is_empty() { + if self.running == RunningState::Paused || self.drain.is_some() { match io.poll_completed() { Ok(Async::Ready(_)) => self.running.resume(), @@ -763,11 +709,8 @@ impl ProcessResponse { } // drain futures - if !self.drain.0.is_empty() { - for fut in &mut self.drain.0 { - fut.borrow_mut().set() - } - self.drain.0.clear(); + if let Some(tx) = self.drain.take() { + let _ = tx.send(()); } // response is completed