1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

refactor h1 stream polling

This commit is contained in:
Nikolay Kim 2018-02-09 22:26:48 -08:00
parent b4b5c78b51
commit 0c98775b51
5 changed files with 164 additions and 169 deletions

View File

@ -40,7 +40,8 @@ impl fmt::Debug for PayloadItem {
/// Buffered stream of bytes chunks /// Buffered stream of bytes chunks
/// ///
/// Payload stores chunks in a vector. First chunk can be received with `.readany()` method. /// Payload stores chunks in a vector. First chunk can be received with `.readany()` method.
/// Payload stream is not thread safe. /// Payload stream is not thread safe. Payload does not notify current task when
/// new data is available.
/// ///
/// Payload stream can be used as `HttpResponse` body stream. /// Payload stream can be used as `HttpResponse` body stream.
#[derive(Debug)] #[derive(Debug)]
@ -148,7 +149,7 @@ impl Stream for Payload {
#[inline] #[inline]
fn poll(&mut self) -> Poll<Option<PayloadItem>, PayloadError> { fn poll(&mut self) -> Poll<Option<PayloadItem>, PayloadError> {
self.inner.borrow_mut().readany() self.inner.borrow_mut().readany(false)
} }
} }
@ -166,7 +167,7 @@ impl Stream for ReadAny {
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Bytes>, Self::Error> { fn poll(&mut self) -> Poll<Option<Bytes>, Self::Error> {
match self.0.borrow_mut().readany()? { match self.0.borrow_mut().readany(false)? {
Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))), Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))),
Async::Ready(None) => Ok(Async::Ready(None)), Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
@ -182,7 +183,7 @@ impl Future for ReadExactly {
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.borrow_mut().readexactly(self.1)? { match self.0.borrow_mut().readexactly(self.1, false)? {
Async::Ready(chunk) => Ok(Async::Ready(chunk)), Async::Ready(chunk) => Ok(Async::Ready(chunk)),
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
} }
@ -197,7 +198,7 @@ impl Future for ReadLine {
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.borrow_mut().readline()? { match self.0.borrow_mut().readline(false)? {
Async::Ready(chunk) => Ok(Async::Ready(chunk)), Async::Ready(chunk) => Ok(Async::Ready(chunk)),
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
} }
@ -212,7 +213,7 @@ impl Future for ReadUntil {
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.borrow_mut().readuntil(&self.1)? { match self.0.borrow_mut().readuntil(&self.1, false)? {
Async::Ready(chunk) => Ok(Async::Ready(chunk)), Async::Ready(chunk) => Ok(Async::Ready(chunk)),
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
} }
@ -324,7 +325,7 @@ impl Inner {
self.len self.len
} }
fn readany(&mut self) -> Poll<Option<PayloadItem>, PayloadError> { fn readany(&mut self, notify: bool) -> Poll<Option<PayloadItem>, PayloadError> {
if let Some(data) = self.items.pop_front() { if let Some(data) = self.items.pop_front() {
self.len -= data.len(); self.len -= data.len();
Ok(Async::Ready(Some(PayloadItem(data)))) Ok(Async::Ready(Some(PayloadItem(data))))
@ -333,12 +334,14 @@ impl Inner {
} else if self.eof { } else if self.eof {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
self.task = Some(current_task()); if notify {
self.task = Some(current_task());
}
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
fn readexactly(&mut self, size: usize) -> Result<Async<Bytes>, PayloadError> { fn readexactly(&mut self, size: usize, notify: bool) -> Result<Async<Bytes>, PayloadError> {
if size <= self.len { if size <= self.len {
let mut buf = BytesMut::with_capacity(size); let mut buf = BytesMut::with_capacity(size);
while buf.len() < size { while buf.len() < size {
@ -356,12 +359,14 @@ impl Inner {
if let Some(err) = self.err.take() { if let Some(err) = self.err.take() {
Err(err) Err(err)
} else { } else {
self.task = Some(current_task()); if notify {
self.task = Some(current_task());
}
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
fn readuntil(&mut self, line: &[u8]) -> Result<Async<Bytes>, PayloadError> { fn readuntil(&mut self, line: &[u8], notify: bool) -> Result<Async<Bytes>, PayloadError> {
let mut idx = 0; let mut idx = 0;
let mut num = 0; let mut num = 0;
let mut offset = 0; let mut offset = 0;
@ -411,13 +416,15 @@ impl Inner {
if let Some(err) = self.err.take() { if let Some(err) = self.err.take() {
Err(err) Err(err)
} else { } else {
self.task = Some(current_task()); if notify {
self.task = Some(current_task());
}
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
fn readline(&mut self) -> Result<Async<Bytes>, PayloadError> { fn readline(&mut self, notify: bool) -> Result<Async<Bytes>, PayloadError> {
self.readuntil(b"\n") self.readuntil(b"\n", notify)
} }
pub fn readall(&mut self) -> Option<Bytes> { pub fn readall(&mut self) -> Option<Bytes> {

View File

@ -96,8 +96,6 @@ impl<T, H> Http1<T, H>
} }
} }
// TODO: refactor
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
pub fn poll(&mut self) -> Poll<(), ()> { pub fn poll(&mut self) -> Poll<(), ()> {
// keep-alive timer // keep-alive timer
if let Some(ref mut timer) = self.keepalive_timer { if let Some(ref mut timer) = self.keepalive_timer {
@ -111,99 +109,19 @@ impl<T, H> Http1<T, H>
} }
} }
loop { self.poll_io()
let mut not_ready = true; }
// check in-flight messages // TODO: refactor
let mut io = false; pub fn poll_io(&mut self) -> Poll<(), ()> {
let mut idx = 0; // read incoming data
while idx < self.tasks.len() { let need_read =
let item = &mut self.tasks[idx]; if !self.flags.contains(Flags::ERROR) && self.tasks.len() < MAX_PIPELINED_MESSAGES
{
if !io && !item.flags.contains(EntryFlags::EOF) { 'outer: loop {
if item.flags.contains(EntryFlags::ERROR) {
// check stream state
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
match item.pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => {
not_ready = false;
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
}
self.stream.reset();
item.flags.insert(EntryFlags::EOF);
if ready {
item.flags.insert(EntryFlags::FINISHED);
}
},
// no more IO for this iteration
Ok(Async::NotReady) => io = true,
Err(err) => {
// it is not possible to recover from error
// during pipe handling, so just drop connection
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::ERROR);
// check stream state, we still can have valid data in buffer
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
}
} else if !item.flags.contains(EntryFlags::FINISHED) {
match item.pipe.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
not_ready = false;
item.flags.insert(EntryFlags::FINISHED);
},
Err(err) => {
item.flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err);
}
}
}
idx += 1;
}
// cleanup finished tasks
while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF) &&
self.tasks[0].flags.contains(EntryFlags::FINISHED)
{
self.tasks.pop_front();
} else {
break
}
}
// no keep-alive
if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() {
// check stream state
if !self.poll_completed(true)? {
return Ok(Async::NotReady)
}
return Ok(Async::Ready(()))
}
// read incoming data
while !self.flags.contains(Flags::ERROR) && self.tasks.len() < MAX_PIPELINED_MESSAGES {
match self.reader.parse(self.stream.get_mut(), match self.reader.parse(self.stream.get_mut(),
&mut self.read_buf, &self.settings) { &mut self.read_buf, &self.settings) {
Ok(Async::Ready(mut req)) => { Ok(Async::Ready(mut req)) => {
not_ready = false;
// set remote addr // set remote addr
req.set_peer_addr(self.addr); req.set_peer_addr(self.addr);
@ -211,58 +129,24 @@ impl<T, H> Http1<T, H>
self.keepalive_timer.take(); self.keepalive_timer.take();
// start request processing // start request processing
let mut pipe = None;
for h in self.settings.handlers().iter_mut() { for h in self.settings.handlers().iter_mut() {
req = match h.handle(req) { req = match h.handle(req) {
Ok(t) => { Ok(t) => {
pipe = Some(t); self.tasks.push_back(
break Entry {pipe: t, flags: EntryFlags::empty()});
continue 'outer
}, },
Err(req) => req, Err(req) => req,
} }
} }
self.tasks.push_back( self.tasks.push_back(
Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)), Entry {pipe: Pipeline::error(HTTPNotFound),
flags: EntryFlags::empty()}); flags: EntryFlags::empty()});
continue
}, },
Ok(Async::NotReady) => { Ok(Async::NotReady) => (),
// start keep-alive timer, this also is slow request timeout
if self.tasks.is_empty() {
if self.settings.keep_alive_enabled() {
let keep_alive = self.settings.keep_alive();
if keep_alive > 0 && self.flags.contains(Flags::KEEPALIVE) {
if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut to = Timeout::new(
Duration::new(keep_alive, 0),
Arbiter::handle()).unwrap();
// register timeout
let _ = to.poll();
self.keepalive_timer = Some(to);
}
} else {
// check stream state
if !self.poll_completed(true)? {
return Ok(Async::NotReady)
}
// keep-alive disable, drop connection
return Ok(Async::Ready(()))
}
} else if !self.poll_completed(false)? ||
self.flags.contains(Flags::KEEPALIVE)
{
// check stream state or
// if keep-alive unset, rely on operating system
return Ok(Async::NotReady)
} else {
return Ok(Async::Ready(()))
}
}
break
},
Err(ReaderError::Disconnect) => { Err(ReaderError::Disconnect) => {
not_ready = false;
self.flags.insert(Flags::ERROR); self.flags.insert(Flags::ERROR);
self.stream.disconnected(); self.stream.disconnected();
for entry in &mut self.tasks { for entry in &mut self.tasks {
@ -271,7 +155,6 @@ impl<T, H> Http1<T, H>
}, },
Err(err) => { Err(err) => {
// notify all tasks // notify all tasks
not_ready = false;
self.stream.disconnected(); self.stream.disconnected();
for entry in &mut self.tasks { for entry in &mut self.tasks {
entry.pipe.disconnected() entry.pipe.disconnected()
@ -293,20 +176,132 @@ impl<T, H> Http1<T, H>
} }
}, },
} }
break
}
false
} else {
true
};
loop {
// check in-flight messages
let mut io = false;
let mut idx = 0;
while idx < self.tasks.len() {
let item = &mut self.tasks[idx];
if !io && !item.flags.contains(EntryFlags::EOF) {
// io is corrupted, send buffer
if item.flags.contains(EntryFlags::ERROR) {
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
match item.pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => {
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
}
// prepare stream for next response
self.stream.reset();
if ready {
item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED);
} else {
item.flags.insert(EntryFlags::FINISHED);
}
},
// no more IO for this iteration
Ok(Async::NotReady) => io = true,
Err(err) => {
// it is not possible to recover from error
// during pipe handling, so just drop connection
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::ERROR);
// check stream state, we still can have valid data in buffer
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
}
} else if !item.flags.contains(EntryFlags::FINISHED) {
match item.pipe.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED),
Err(err) => {
item.flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err);
}
}
}
idx += 1;
} }
// check for parse error // cleanup finished tasks
if self.tasks.is_empty() { let mut popped = false;
while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF | EntryFlags::FINISHED) {
popped = true;
self.tasks.pop_front();
} else {
break
}
}
if need_read && popped {
return self.poll_io()
}
// no keep-alive
if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() {
// check stream state // check stream state
if !self.poll_completed(true)? { if !self.poll_completed(true)? {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
if self.flags.contains(Flags::ERROR) || self.keepalive_timer.is_none() { return Ok(Async::Ready(()))
return Ok(Async::Ready(()))
}
} }
if not_ready { // start keep-alive timer, this also is slow request timeout
if self.tasks.is_empty() {
// check stream state
if self.flags.contains(Flags::ERROR) {
return Ok(Async::Ready(()))
}
if self.settings.keep_alive_enabled() {
let keep_alive = self.settings.keep_alive();
if keep_alive > 0 && self.flags.contains(Flags::KEEPALIVE) {
if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut to = Timeout::new(
Duration::new(keep_alive, 0), Arbiter::handle()).unwrap();
// register timeout
let _ = to.poll();
self.keepalive_timer = Some(to);
}
} else {
// check stream state
if !self.poll_completed(true)? {
return Ok(Async::NotReady)
}
// keep-alive is disabled, drop connection
return Ok(Async::Ready(()))
}
} else if !self.poll_completed(false)? ||
self.flags.contains(Flags::KEEPALIVE) {
// check stream state or
// if keep-alive unset, rely on operating system
return Ok(Async::NotReady)
} else {
return Ok(Async::Ready(()))
}
} else {
self.poll_completed(false)?; self.poll_completed(false)?;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
@ -344,7 +339,7 @@ impl Reader {
#[inline] #[inline]
fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo) fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo)
-> std::result::Result<Decoding, ReaderError> -> Result<Decoding, ReaderError>
{ {
loop { loop {
match payload.decoder.decode(buf) { match payload.decoder.decode(buf) {
@ -416,15 +411,10 @@ impl Reader {
// if buf is empty parse_message will always return NotReady, let's avoid that // if buf is empty parse_message will always return NotReady, let's avoid that
let read = if buf.is_empty() { let read = if buf.is_empty() {
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect),
// debug!("Ignored premature client disconnection");
return Err(ReaderError::Disconnect);
},
Ok(Async::Ready(_)) => (), Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => Ok(Async::NotReady) => return Ok(Async::NotReady),
return Ok(Async::NotReady), Err(err) => return Err(ReaderError::Error(err.into()))
Err(err) =>
return Err(ReaderError::Error(err.into()))
} }
false false
} else { } else {
@ -455,10 +445,8 @@ impl Reader {
return Err(ReaderError::Disconnect); return Err(ReaderError::Disconnect);
}, },
Ok(Async::Ready(_)) => (), Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => Ok(Async::NotReady) => return Ok(Async::NotReady),
return Ok(Async::NotReady), Err(err) => return Err(ReaderError::Error(err.into())),
Err(err) =>
return Err(ReaderError::Error(err.into()))
} }
} else { } else {
return Ok(Async::NotReady) return Ok(Async::NotReady)

View File

@ -139,8 +139,8 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
/// Send text frame /// Send text frame
#[inline] #[inline]
pub fn text(&mut self, text: &str) { pub fn text<T: Into<String>>(&mut self, text: T) {
self.write(Frame::message(Vec::from(text), OpCode::Text, true).generate(false)); self.write(Frame::message(text.into(), OpCode::Text, true).generate(false));
} }
/// Send binary frame /// Send binary frame

View File

@ -30,7 +30,7 @@
//! fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { //! fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
//! match msg { //! match msg {
//! ws::Message::Ping(msg) => ctx.pong(&msg), //! ws::Message::Ping(msg) => ctx.pong(&msg),
//! ws::Message::Text(text) => ctx.text(&text), //! ws::Message::Text(text) => ctx.text(text),
//! ws::Message::Binary(bin) => ctx.binary(bin), //! ws::Message::Binary(bin) => ctx.binary(bin),
//! _ => (), //! _ => (),
//! } //! }

View File

@ -22,7 +22,7 @@ impl Handler<ws::Message> for Ws {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg { match msg {
ws::Message::Ping(msg) => ctx.pong(&msg), ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(&text), ws::Message::Text(text) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin), ws::Message::Binary(bin) => ctx.binary(bin),
_ => (), _ => (),
} }