diff --git a/src/httpmessage.rs b/src/httpmessage.rs index 82c50d77..1ce04b47 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -3,7 +3,7 @@ use encoding::all::UTF_8; use encoding::label::encoding_from_whatwg_label; use encoding::types::{DecoderTrap, Encoding}; use encoding::EncodingRef; -use futures::{Future, Poll, Stream, Async}; +use futures::{Async, Future, Poll, Stream}; use http::{header, HeaderMap}; use http_range::HttpRange; use mime::Mime; @@ -12,8 +12,8 @@ use serde_urlencoded; use std::str; use error::{ - ContentTypeError, HttpRangeError, ParseError, PayloadError, UrlencodedError, - ReadlinesError + ContentTypeError, HttpRangeError, ParseError, PayloadError, ReadlinesError, + UrlencodedError, }; use header::Header; use json::JsonBody; @@ -261,7 +261,7 @@ pub trait HttpMessage { let boundary = Multipart::boundary(self.headers()); Multipart::new(boundary, self) } - + /// Return stream of lines. fn readlines(self) -> Readlines where @@ -295,7 +295,7 @@ where checked_buff: true, } } - + /// Change max line size. By default max size is 256Kb pub fn limit(mut self, limit: usize) -> Self { self.limit = limit; @@ -309,31 +309,31 @@ where { type Item = String; type Error = ReadlinesError; - + fn poll(&mut self) -> Poll, Self::Error> { let encoding = self.req.encoding()?; // check if there is a newline in the buffer if !self.checked_buff { let mut found: Option = None; for (ind, b) in self.buff.iter().enumerate() { - if *b == '\n' as u8 { + if *b == b'\n' { found = Some(ind); break; } } if let Some(ind) = found { // check if line is longer than limit - if ind+1 > self.limit { + if ind + 1 > self.limit { return Err(ReadlinesError::LimitOverflow); } let enc: *const Encoding = encoding as *const Encoding; let line = if enc == UTF_8 { - str::from_utf8(&self.buff.split_to(ind+1)) + str::from_utf8(&self.buff.split_to(ind + 1)) .map_err(|_| ReadlinesError::EncodingError)? .to_owned() } else { encoding - .decode(&self.buff.split_to(ind+1), DecoderTrap::Strict) + .decode(&self.buff.split_to(ind + 1), DecoderTrap::Strict) .map_err(|_| ReadlinesError::EncodingError)? }; return Ok(Async::Ready(Some(line))); @@ -346,24 +346,24 @@ where // check if there is a newline in bytes let mut found: Option = None; for (ind, b) in bytes.iter().enumerate() { - if *b == '\n' as u8 { + if *b == b'\n' { found = Some(ind); break; } } if let Some(ind) = found { // check if line is longer than limit - if ind+1 > self.limit { + if ind + 1 > self.limit { return Err(ReadlinesError::LimitOverflow); } let enc: *const Encoding = encoding as *const Encoding; let line = if enc == UTF_8 { - str::from_utf8(&bytes.split_to(ind+1)) + str::from_utf8(&bytes.split_to(ind + 1)) .map_err(|_| ReadlinesError::EncodingError)? .to_owned() } else { encoding - .decode(&bytes.split_to(ind+1), DecoderTrap::Strict) + .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) .map_err(|_| ReadlinesError::EncodingError)? }; // extend buffer with rest of the bytes; @@ -373,10 +373,10 @@ where } self.buff.extend_from_slice(&bytes); Ok(Async::NotReady) - }, + } Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(None)) => { - if self.buff.len() == 0 { + if self.buff.is_empty() { return Ok(Async::Ready(None)); } if self.buff.len() > self.limit { @@ -393,8 +393,8 @@ where .map_err(|_| ReadlinesError::EncodingError)? }; self.buff.clear(); - return Ok(Async::Ready(Some(line))) - }, + Ok(Async::Ready(Some(line))) + } Err(e) => Err(ReadlinesError::from(e)), } } @@ -805,29 +805,35 @@ mod tests { _ => unreachable!("error"), } } - + #[test] fn test_readlines() { let mut req = HttpRequest::default(); req.payload_mut().unread_data(Bytes::from_static( b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\ industry. Lorem Ipsum has been the industry's standard dummy\n\ - Contrary to popular belief, Lorem Ipsum is not simply random text." + Contrary to popular belief, Lorem Ipsum is not simply random text.", )); let mut r = Readlines::new(req); match r.poll().ok().unwrap() { - Async::Ready(Some(s)) => assert_eq!(s, - "Lorem Ipsum is simply dummy text of the printing and typesetting\n"), + Async::Ready(Some(s)) => assert_eq!( + s, + "Lorem Ipsum is simply dummy text of the printing and typesetting\n" + ), _ => unreachable!("error"), } match r.poll().ok().unwrap() { - Async::Ready(Some(s)) => assert_eq!(s, - "industry. Lorem Ipsum has been the industry's standard dummy\n"), + Async::Ready(Some(s)) => assert_eq!( + s, + "industry. Lorem Ipsum has been the industry's standard dummy\n" + ), _ => unreachable!("error"), } match r.poll().ok().unwrap() { - Async::Ready(Some(s)) => assert_eq!(s, - "Contrary to popular belief, Lorem Ipsum is not simply random text."), + Async::Ready(Some(s)) => assert_eq!( + s, + "Contrary to popular belief, Lorem Ipsum is not simply random text." + ), _ => unreachable!("error"), } } diff --git a/src/server/srv.rs b/src/server/srv.rs index c1cf0a18..897dbb09 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -615,54 +615,51 @@ impl Handler for HttpServer { /// Commands from accept threads impl StreamHandler for HttpServer { fn handle(&mut self, msg: Result, ()>, _: &mut Context) { - match msg { - Ok(Some(ServerCommand::WorkerDied(idx, socks))) => { - let mut found = false; - for i in 0..self.workers.len() { - if self.workers[i].0 == idx { - self.workers.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker has died {:?}, restarting", idx); - let (tx, rx) = mpsc::unbounded::>(); - - let mut new_idx = self.workers.len(); - 'found: loop { - for i in 0..self.workers.len() { - if self.workers[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let ka = self.keep_alive; - let factory = Arc::clone(&self.factory); - let settings = - ServerSettings::new(Some(socks[0].addr), &self.host, false); - - let addr = Arbiter::start(move |ctx: &mut Context<_>| { - let apps: Vec<_> = (*factory)() - .into_iter() - .map(|h| h.into_handler(settings.clone())) - .collect(); - ctx.add_message_stream(rx); - Worker::new(apps, socks, ka) - }); - for item in &self.accept { - let _ = item.1.send(Command::Worker(new_idx, tx.clone())); - let _ = item.0.set_readiness(mio::Ready::readable()); - } - - self.workers.push((new_idx, addr)); + if let Ok(Some(ServerCommand::WorkerDied(idx, socks))) = msg { + let mut found = false; + for i in 0..self.workers.len() { + if self.workers[i].0 == idx { + self.workers.swap_remove(i); + found = true; + break; } } - _ => (), + + if found { + error!("Worker has died {:?}, restarting", idx); + let (tx, rx) = mpsc::unbounded::>(); + + let mut new_idx = self.workers.len(); + 'found: loop { + for i in 0..self.workers.len() { + if self.workers[i].0 == new_idx { + new_idx += 1; + continue 'found; + } + } + break; + } + + let ka = self.keep_alive; + let factory = Arc::clone(&self.factory); + let settings = + ServerSettings::new(Some(socks[0].addr), &self.host, false); + + let addr = Arbiter::start(move |ctx: &mut Context<_>| { + let apps: Vec<_> = (*factory)() + .into_iter() + .map(|h| h.into_handler(settings.clone())) + .collect(); + ctx.add_message_stream(rx); + Worker::new(apps, socks, ka) + }); + for item in &self.accept { + let _ = item.1.send(Command::Worker(new_idx, tx.clone())); + let _ = item.0.set_readiness(mio::Ready::readable()); + } + + self.workers.push((new_idx, addr)); + } } } }