1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-12-18 01:43:58 +01:00

simplify drain feature

This commit is contained in:
Nikolay Kim 2017-12-24 11:58:09 -08:00
parent eaab28cd3b
commit 9f9c75d832
4 changed files with 18 additions and 95 deletions

View File

@ -92,7 +92,7 @@ fn main() {
.header("LOCATION", "/index.html") .header("LOCATION", "/index.html")
.finish() .finish()
}))) })))
.bind("127.0.0.1:8080").unwrap() .bind("0.0.0.0:8080").unwrap()
.start(); .start();
println!("Starting http server: 127.0.0.1:8080"); println!("Starting http server: 127.0.0.1:8080");

View File

@ -1,10 +1,8 @@
use std; use std;
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::marker::PhantomData; use futures::{Async, Poll};
use futures::{Async, Future, Poll};
use futures::sync::oneshot::Sender; use futures::sync::oneshot::Sender;
use futures::unsync::oneshot;
use actix::{Actor, ActorState, ActorContext, AsyncContext, use actix::{Actor, ActorState, ActorContext, AsyncContext,
Handler, Subscriber, ResponseType}; Handler, Subscriber, ResponseType};
@ -16,7 +14,7 @@ use body::{Body, Binary};
use error::Error; use error::Error;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use pipeline::DrainFut;
pub(crate) trait IoContext: 'static { pub(crate) trait IoContext: 'static {
fn disconnected(&mut self); fn disconnected(&mut self);
@ -27,7 +25,7 @@ pub(crate) trait IoContext: 'static {
pub(crate) enum Frame { pub(crate) enum Frame {
Message(HttpResponse), Message(HttpResponse),
Payload(Option<Binary>), Payload(Option<Binary>),
Drain(Rc<RefCell<DrainFut>>), Drain(oneshot::Sender<()>),
} }
/// Http actor execution context /// Http actor execution context
@ -161,11 +159,11 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
} }
/// Returns drain future /// Returns drain future
pub fn drain(&mut self) -> Drain<A> { pub fn drain(&mut self) -> oneshot::Receiver<()> {
let fut = Rc::new(RefCell::new(DrainFut::default())); let (tx, rx) = oneshot::channel();
self.stream.push_back(Frame::Drain(Rc::clone(&fut)));
self.modified = true; self.modified = true;
Drain{ a: PhantomData, inner: fut } self.stream.push_back(Frame::Drain(tx));
rx
} }
/// Check if connection still open /// Check if connection still open
@ -305,21 +303,3 @@ impl<A, S> ToEnvelope<A> for HttpContext<A, S>
RemoteEnvelope::new(msg, tx).into() RemoteEnvelope::new(msg, tx).into()
} }
} }
pub struct Drain<A> {
a: PhantomData<A>,
inner: Rc<RefCell<DrainFut>>
}
impl<A> ActorFuture for Drain<A>
where A: Actor
{
type Item = ();
type Error = ();
type Actor = A;
fn poll(&mut self, _: &mut A, _: &mut <Self::Actor as Actor>::Context) -> Poll<(), ()> {
self.inner.borrow_mut().poll()
}
}

View File

@ -394,7 +394,7 @@ impl PayloadEncoder {
}, },
Body::Binary(ref mut bytes) => { Body::Binary(ref mut bytes) => {
if compression { if compression {
let mut buf = SharedBytes::default(); let buf = SharedBytes::default();
let transfer = TransferEncoding::eof(buf.clone()); let transfer = TransferEncoding::eof(buf.clone());
let mut enc = match encoding { let mut enc = match encoding {
ContentEncoding::Deflate => ContentEncoder::Deflate( ContentEncoding::Deflate => ContentEncoder::Deflate(

View File

@ -1,10 +1,9 @@
use std::{io, mem}; use std::{io, mem};
use std::rc::Rc; use std::rc::Rc;
use std::cell::RefCell;
use std::marker::PhantomData; use std::marker::PhantomData;
use futures::{Async, Poll, Future, Stream}; use futures::{Async, Poll, Future, Stream};
use futures::task::{Task as FutureTask, current as current_task}; use futures::unsync::oneshot;
use channel::HttpHandlerTask; use channel::HttpHandlerTask;
use body::{Body, BodyStream}; use body::{Body, BodyStream};
@ -76,49 +75,6 @@ enum PipelineResponse {
Response(Box<Future<Item=HttpResponse, Error=Error>>), Response(Box<Future<Item=HttpResponse, Error=Error>>),
} }
/// Future that resolves when all buffered data get sent
#[doc(hidden)]
#[derive(Debug)]
pub struct DrainFut {
drained: bool,
task: Option<FutureTask>,
}
impl Default for DrainFut {
fn default() -> DrainFut {
DrainFut {
drained: false,
task: None,
}
}
}
impl DrainFut {
fn set(&mut self) {
self.drained = true;
if let Some(task) = self.task.take() {
task.notify()
}
}
}
impl Future for DrainFut {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if self.drained {
Ok(Async::Ready(()))
} else {
self.task = Some(current_task());
Ok(Async::NotReady)
}
}
}
impl<S> Pipeline<S> { impl<S> Pipeline<S> {
pub fn new(req: HttpRequest<S>, pub fn new(req: HttpRequest<S>,
@ -554,7 +510,7 @@ struct ProcessResponse<S> {
resp: HttpResponse, resp: HttpResponse,
iostate: IOState, iostate: IOState,
running: RunningState, running: RunningState,
drain: DrainVec, drain: Option<oneshot::Sender<()>>,
_s: PhantomData<S>, _s: PhantomData<S>,
} }
@ -587,16 +543,6 @@ enum IOState {
Done, Done,
} }
struct DrainVec(Vec<Rc<RefCell<DrainFut>>>);
impl Drop for DrainVec {
fn drop(&mut self) {
for drain in &mut self.0 {
drain.borrow_mut().set()
}
}
}
impl<S> ProcessResponse<S> { impl<S> ProcessResponse<S> {
#[inline] #[inline]
@ -606,14 +552,14 @@ impl<S> ProcessResponse<S> {
ProcessResponse{ resp: resp, ProcessResponse{ resp: resp,
iostate: IOState::Response, iostate: IOState::Response,
running: RunningState::Running, running: RunningState::Running,
drain: DrainVec(Vec::new()), drain: None,
_s: PhantomData}) _s: PhantomData})
} }
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>) fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
-> Result<PipelineState<S>, PipelineState<S>> -> Result<PipelineState<S>, PipelineState<S>>
{ {
if self.drain.0.is_empty() && self.running != RunningState::Paused { if self.drain.is_none() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full // if task is paused, write buffer is probably full
loop { loop {
@ -712,7 +658,7 @@ impl<S> ProcessResponse<S> {
} }
}, },
Frame::Drain(fut) => { Frame::Drain(fut) => {
self.drain.0.push(fut); self.drain = Some(fut);
break break
} }
} }
@ -748,7 +694,7 @@ impl<S> ProcessResponse<S> {
} }
// flush io but only if we need to // flush io but only if we need to
if self.running == RunningState::Paused || !self.drain.0.is_empty() { if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed() { match io.poll_completed() {
Ok(Async::Ready(_)) => Ok(Async::Ready(_)) =>
self.running.resume(), self.running.resume(),
@ -763,11 +709,8 @@ impl<S> ProcessResponse<S> {
} }
// drain futures // drain futures
if !self.drain.0.is_empty() { if let Some(tx) = self.drain.take() {
for fut in &mut self.drain.0 { let _ = tx.send(());
fut.borrow_mut().set()
}
self.drain.0.clear();
} }
// response is completed // response is completed