From edd22bb2790fb18ebce07d79996a99a1ec44d821 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 22 Jun 2018 09:01:20 +0600 Subject: [PATCH] refactor read_from_io --- src/client/parser.rs | 8 ++++---- src/server/channel.rs | 4 ++-- src/server/mod.rs | 27 +++++++++++++++++++++++++-- src/server/utils.rs | 31 ------------------------------- 4 files changed, 31 insertions(+), 39 deletions(-) delete mode 100644 src/server/utils.rs diff --git a/src/client/parser.rs b/src/client/parser.rs index b292e8d42..1638d8ebc 100644 --- a/src/client/parser.rs +++ b/src/client/parser.rs @@ -8,7 +8,7 @@ use std::mem; use error::{ParseError, PayloadError}; use server::h1decoder::EncodingDecoder; -use server::{utils, IoStream}; +use server::IoStream; use super::response::ClientMessage; use super::ClientResponse; @@ -39,7 +39,7 @@ impl HttpResponseParser { { // if buf is empty parse_message will always return NotReady, let's avoid that if buf.is_empty() { - match utils::read_from_io(io, buf) { + match io.read_available(buf) { Ok(Async::Ready(0)) => return Err(HttpResponseParserError::Disconnect), Ok(Async::Ready(_)) => (), Ok(Async::NotReady) => return Ok(Async::NotReady), @@ -59,7 +59,7 @@ impl HttpResponseParser { if buf.capacity() >= MAX_BUFFER_SIZE { return Err(HttpResponseParserError::Error(ParseError::TooLarge)); } - match utils::read_from_io(io, buf) { + match io.read_available(buf) { Ok(Async::Ready(0)) => { return Err(HttpResponseParserError::Disconnect) } @@ -83,7 +83,7 @@ impl HttpResponseParser { if self.decoder.is_some() { loop { // read payload - let (not_ready, stream_finished) = match utils::read_from_io(io, buf) { + let (not_ready, stream_finished) = match io.read_available(buf) { Ok(Async::Ready(0)) => (false, true), Err(err) => return Err(err.into()), Ok(Async::NotReady) => (true, false), diff --git a/src/server/channel.rs b/src/server/channel.rs index d236963b5..260613520 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -7,7 +7,7 @@ use futures::{Async, Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use super::settings::WorkerSettings; -use super::{h1, h2, utils, HttpHandler, IoStream}; +use super::{h1, h2, HttpHandler, IoStream}; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; @@ -139,7 +139,7 @@ where ref mut io, ref mut buf, )) => { - match utils::read_from_io(io, buf) { + match io.read_available(buf) { Ok(Async::Ready(0)) | Err(_) => { debug!("Ignored premature client disconnection"); settings.remove_channel(); diff --git a/src/server/mod.rs b/src/server/mod.rs index b65fc3a86..c98579d0b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,7 +2,7 @@ use std::net::Shutdown; use std::{io, time}; -use bytes::BytesMut; +use bytes::{BufMut, BytesMut}; use futures::{Async, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tcp::TcpStream; @@ -18,7 +18,6 @@ pub(crate) mod helpers; pub(crate) mod settings; pub(crate) mod shared; mod srv; -pub(crate) mod utils; mod worker; pub use self::settings::ServerSettings; @@ -37,6 +36,9 @@ use httpresponse::HttpResponse; /// max buffer size 64k pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536; +const LW_BUFFER_SIZE: usize = 4096; +const HW_BUFFER_SIZE: usize = 32_768; + /// Create new http server with application factory. /// /// This is shortcut for `server::HttpServer::new()` method. @@ -213,6 +215,27 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static { fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>; fn set_linger(&mut self, dur: Option) -> io::Result<()>; + + fn read_available(&mut self, buf: &mut BytesMut) -> Poll { + unsafe { + if buf.remaining_mut() < LW_BUFFER_SIZE { + buf.reserve(HW_BUFFER_SIZE); + } + match self.read(buf.bytes_mut()) { + Ok(n) => { + buf.advance_mut(n); + Ok(Async::Ready(n)) + } + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + Ok(Async::NotReady) + } else { + Err(e) + } + } + } + } + } } impl IoStream for TcpStream { diff --git a/src/server/utils.rs b/src/server/utils.rs deleted file mode 100644 index e0e7e7f62..000000000 --- a/src/server/utils.rs +++ /dev/null @@ -1,31 +0,0 @@ -use bytes::{BufMut, BytesMut}; -use futures::{Async, Poll}; -use std::io; - -use super::IoStream; - -const LW_BUFFER_SIZE: usize = 4096; -const HW_BUFFER_SIZE: usize = 32_768; - -pub fn read_from_io( - io: &mut T, buf: &mut BytesMut, -) -> Poll { - unsafe { - if buf.remaining_mut() < LW_BUFFER_SIZE { - buf.reserve(HW_BUFFER_SIZE); - } - match io.read(buf.bytes_mut()) { - Ok(n) => { - buf.advance_mut(n); - Ok(Async::Ready(n)) - } - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - Ok(Async::NotReady) - } else { - Err(e) - } - } - } - } -}