1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

refactor read_from_io

This commit is contained in:
Nikolay Kim 2018-06-22 09:01:20 +06:00
parent 17c033030b
commit edd22bb279
4 changed files with 31 additions and 39 deletions

View File

@ -8,7 +8,7 @@ use std::mem;
use error::{ParseError, PayloadError}; use error::{ParseError, PayloadError};
use server::h1decoder::EncodingDecoder; use server::h1decoder::EncodingDecoder;
use server::{utils, IoStream}; use server::IoStream;
use super::response::ClientMessage; use super::response::ClientMessage;
use super::ClientResponse; use super::ClientResponse;
@ -39,7 +39,7 @@ impl HttpResponseParser {
{ {
// if buf is empty parse_message will always return NotReady, let's avoid that // if buf is empty parse_message will always return NotReady, let's avoid that
if buf.is_empty() { if buf.is_empty() {
match utils::read_from_io(io, buf) { match io.read_available(buf) {
Ok(Async::Ready(0)) => return Err(HttpResponseParserError::Disconnect), Ok(Async::Ready(0)) => return Err(HttpResponseParserError::Disconnect),
Ok(Async::Ready(_)) => (), Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
@ -59,7 +59,7 @@ impl HttpResponseParser {
if buf.capacity() >= MAX_BUFFER_SIZE { if buf.capacity() >= MAX_BUFFER_SIZE {
return Err(HttpResponseParserError::Error(ParseError::TooLarge)); return Err(HttpResponseParserError::Error(ParseError::TooLarge));
} }
match utils::read_from_io(io, buf) { match io.read_available(buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => {
return Err(HttpResponseParserError::Disconnect) return Err(HttpResponseParserError::Disconnect)
} }
@ -83,7 +83,7 @@ impl HttpResponseParser {
if self.decoder.is_some() { if self.decoder.is_some() {
loop { loop {
// read payload // read payload
let (not_ready, stream_finished) = match utils::read_from_io(io, buf) { let (not_ready, stream_finished) = match io.read_available(buf) {
Ok(Async::Ready(0)) => (false, true), Ok(Async::Ready(0)) => (false, true),
Err(err) => return Err(err.into()), Err(err) => return Err(err.into()),
Ok(Async::NotReady) => (true, false), Ok(Async::NotReady) => (true, false),

View File

@ -7,7 +7,7 @@ use futures::{Async, Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{h1, h2, utils, HttpHandler, IoStream}; use super::{h1, h2, HttpHandler, IoStream};
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
@ -139,7 +139,7 @@ where
ref mut io, ref mut io,
ref mut buf, ref mut buf,
)) => { )) => {
match utils::read_from_io(io, buf) { match io.read_available(buf) {
Ok(Async::Ready(0)) | Err(_) => { Ok(Async::Ready(0)) | Err(_) => {
debug!("Ignored premature client disconnection"); debug!("Ignored premature client disconnection");
settings.remove_channel(); settings.remove_channel();

View File

@ -2,7 +2,7 @@
use std::net::Shutdown; use std::net::Shutdown;
use std::{io, time}; use std::{io, time};
use bytes::BytesMut; use bytes::{BufMut, BytesMut};
use futures::{Async, Poll}; use futures::{Async, Poll};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
@ -18,7 +18,6 @@ pub(crate) mod helpers;
pub(crate) mod settings; pub(crate) mod settings;
pub(crate) mod shared; pub(crate) mod shared;
mod srv; mod srv;
pub(crate) mod utils;
mod worker; mod worker;
pub use self::settings::ServerSettings; pub use self::settings::ServerSettings;
@ -37,6 +36,9 @@ use httpresponse::HttpResponse;
/// max buffer size 64k /// max buffer size 64k
pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536; pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;
const LW_BUFFER_SIZE: usize = 4096;
const HW_BUFFER_SIZE: usize = 32_768;
/// Create new http server with application factory. /// Create new http server with application factory.
/// ///
/// This is shortcut for `server::HttpServer::new()` method. /// This is shortcut for `server::HttpServer::new()` method.
@ -213,6 +215,27 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>; fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>;
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>; fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<usize, io::Error> {
unsafe {
if buf.remaining_mut() < LW_BUFFER_SIZE {
buf.reserve(HW_BUFFER_SIZE);
}
match self.read(buf.bytes_mut()) {
Ok(n) => {
buf.advance_mut(n);
Ok(Async::Ready(n))
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(Async::NotReady)
} else {
Err(e)
}
}
}
}
}
} }
impl IoStream for TcpStream { impl IoStream for TcpStream {

View File

@ -1,31 +0,0 @@
use bytes::{BufMut, BytesMut};
use futures::{Async, Poll};
use std::io;
use super::IoStream;
const LW_BUFFER_SIZE: usize = 4096;
const HW_BUFFER_SIZE: usize = 32_768;
pub fn read_from_io<T: IoStream>(
io: &mut T, buf: &mut BytesMut,
) -> Poll<usize, io::Error> {
unsafe {
if buf.remaining_mut() < LW_BUFFER_SIZE {
buf.reserve(HW_BUFFER_SIZE);
}
match io.read(buf.bytes_mut()) {
Ok(n) => {
buf.advance_mut(n);
Ok(Async::Ready(n))
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(Async::NotReady)
} else {
Err(e)
}
}
}
}
}