From 2710f70e394700c58dbf1951d19bd0b249fbf279 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 2 Oct 2018 17:30:29 -0700 Subject: [PATCH] add H1 transport --- src/server/channel.rs | 85 ++++++++++++++++++++++++++++------ src/server/h1.rs | 13 ++++-- src/server/h2.rs | 4 +- src/server/incoming.rs | 4 +- src/server/mod.rs | 20 ++++++-- src/server/service.rs | 87 ++++++++++++++++++++++++++++++++++- src/server/ssl/nativetls.rs | 7 ++- src/server/ssl/openssl.rs | 7 ++- src/server/ssl/rustls.rs | 7 ++- tests/test_custom_pipeline.rs | 81 ++++++++++++++++++++++++++++++++ tests/test_server.rs | 43 ----------------- 11 files changed, 284 insertions(+), 74 deletions(-) create mode 100644 tests/test_custom_pipeline.rs diff --git a/src/server/channel.rs b/src/server/channel.rs index 513601ac..cbbe1a95 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -1,4 +1,4 @@ -use std::net::{Shutdown, SocketAddr}; +use std::net::Shutdown; use std::{io, mem, time}; use bytes::{Buf, BufMut, BytesMut}; @@ -16,7 +16,7 @@ const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; pub(crate) enum HttpProtocol { H1(h1::Http1Dispatcher), H2(h2::Http2), - Unknown(ServiceConfig, Option, T, BytesMut), + Unknown(ServiceConfig, T, BytesMut), None, } @@ -29,7 +29,7 @@ impl HttpProtocol { let _ = IoStream::shutdown(io, Shutdown::Both); } HttpProtocol::H2(ref mut h2) => h2.shutdown(), - HttpProtocol::Unknown(_, _, io, _) => { + HttpProtocol::Unknown(_, io, _) => { let _ = IoStream::set_linger(io, Some(time::Duration::new(0, 0))); let _ = IoStream::shutdown(io, Shutdown::Both); } @@ -59,9 +59,7 @@ where T: IoStream, H: HttpHandler + 'static, { - pub(crate) fn new( - settings: ServiceConfig, io: T, peer: Option, - ) -> HttpChannel { + pub(crate) fn new(settings: ServiceConfig, io: T) -> HttpChannel { let ka_timeout = settings.client_timer(); HttpChannel { @@ -69,7 +67,6 @@ where node_reg: false, node: Node::new(HttpProtocol::Unknown( settings, - peer, io, BytesMut::with_capacity(8192), )), @@ -102,7 +99,7 @@ where Ok(Async::Ready(_)) => { trace!("Slow request timed out, close connection"); let proto = mem::replace(self.node.get_mut(), HttpProtocol::None); - if let HttpProtocol::Unknown(settings, _, io, buf) = proto { + if let HttpProtocol::Unknown(settings, io, buf) = proto { *self.node.get_mut() = HttpProtocol::H1(h1::Http1Dispatcher::for_error( settings, @@ -125,7 +122,7 @@ where let settings = match self.node.get_mut() { HttpProtocol::H1(ref mut h1) => h1.settings().clone(), HttpProtocol::H2(ref mut h2) => h2.settings().clone(), - HttpProtocol::Unknown(ref mut settings, _, _, _) => settings.clone(), + HttpProtocol::Unknown(ref mut settings, _, _) => settings.clone(), HttpProtocol::None => unreachable!(), }; settings.head().insert(&mut self.node); @@ -135,7 +132,7 @@ where let kind = match self.node.get_mut() { HttpProtocol::H1(ref mut h1) => return h1.poll(), HttpProtocol::H2(ref mut h2) => return h2.poll(), - HttpProtocol::Unknown(_, _, ref mut io, ref mut buf) => { + HttpProtocol::Unknown(_, ref mut io, ref mut buf) => { let mut err = None; let mut disconnect = false; match io.read_available(buf) { @@ -173,13 +170,12 @@ where // upgrade to specific http protocol let proto = mem::replace(self.node.get_mut(), HttpProtocol::None); - if let HttpProtocol::Unknown(settings, addr, io, buf) = proto { + if let HttpProtocol::Unknown(settings, io, buf) = proto { match kind { ProtocolKind::Http1 => { *self.node.get_mut() = HttpProtocol::H1(h1::Http1Dispatcher::new( settings, io, - addr, buf, is_eof, self.ka_timeout.take(), @@ -190,7 +186,6 @@ where *self.node.get_mut() = HttpProtocol::H2(h2::Http2::new( settings, io, - addr, buf.freeze(), self.ka_timeout.take(), )); @@ -202,6 +197,70 @@ where } } +#[doc(hidden)] +pub struct H1Channel +where + T: IoStream, + H: HttpHandler + 'static, +{ + node: Node>, + node_reg: bool, +} + +impl H1Channel +where + T: IoStream, + H: HttpHandler + 'static, +{ + pub(crate) fn new(settings: ServiceConfig, io: T) -> H1Channel { + H1Channel { + node_reg: false, + node: Node::new(HttpProtocol::H1(h1::Http1Dispatcher::new( + settings, + io, + BytesMut::with_capacity(8192), + false, + None, + ))), + } + } +} + +impl Drop for H1Channel +where + T: IoStream, + H: HttpHandler + 'static, +{ + fn drop(&mut self) { + self.node.remove(); + } +} + +impl Future for H1Channel +where + T: IoStream, + H: HttpHandler + 'static, +{ + type Item = (); + type Error = HttpDispatchError; + + fn poll(&mut self) -> Poll { + if !self.node_reg { + self.node_reg = true; + let settings = match self.node.get_mut() { + HttpProtocol::H1(ref mut h1) => h1.settings().clone(), + _ => unreachable!(), + }; + settings.head().insert(&mut self.node); + } + + match self.node.get_mut() { + HttpProtocol::H1(ref mut h1) => h1.poll(), + _ => unreachable!(), + } + } +} + pub(crate) struct Node { next: Option<*mut Node>, prev: Option<*mut Node>, diff --git a/src/server/h1.rs b/src/server/h1.rs index 53c4e2cf..7a59b649 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -87,9 +87,10 @@ where H: HttpHandler + 'static, { pub fn new( - settings: ServiceConfig, stream: T, addr: Option, buf: BytesMut, - is_eof: bool, keepalive_timer: Option, + settings: ServiceConfig, stream: T, buf: BytesMut, is_eof: bool, + keepalive_timer: Option, ) -> Self { + let addr = stream.peer_addr(); let (ka_expire, ka_timer) = if let Some(delay) = keepalive_timer { (delay.deadline(), Some(delay)) } else if let Some(delay) = settings.keep_alive_timer() { @@ -107,12 +108,12 @@ where }; Http1Dispatcher { - flags, stream: H1Writer::new(stream, settings.clone()), decoder: H1Decoder::new(), payload: None, tasks: VecDeque::new(), error: None, + flags, addr, buf, settings, @@ -337,9 +338,11 @@ where /// read data from the stream pub(self) fn poll_io(&mut self) -> Result { if !self.flags.contains(Flags::POLLED) { - let updated = self.parse()?; self.flags.insert(Flags::POLLED); - return Ok(updated); + if !self.buf.is_empty() { + let updated = self.parse()?; + return Ok(updated); + } } // read io from socket diff --git a/src/server/h2.rs b/src/server/h2.rs index 312b51df..2fe2fa07 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -58,9 +58,9 @@ where H: HttpHandler + 'static, { pub fn new( - settings: ServiceConfig, io: T, addr: Option, buf: Bytes, - keepalive_timer: Option, + settings: ServiceConfig, io: T, buf: Bytes, keepalive_timer: Option, ) -> Self { + let addr = io.peer_addr(); let extensions = io.extensions(); Http2 { flags: Flags::empty(), diff --git a/src/server/incoming.rs b/src/server/incoming.rs index f2bc1d8f..b13bba2a 100644 --- a/src/server/incoming.rs +++ b/src/server/incoming.rs @@ -64,8 +64,6 @@ where type Result = (); fn handle(&mut self, msg: WrapperStream, _: &mut Context) -> Self::Result { - Arbiter::spawn( - HttpChannel::new(self.settings.clone(), msg, None).map_err(|_| ()), - ); + Arbiter::spawn(HttpChannel::new(self.settings.clone(), msg).map_err(|_| ())); } } diff --git a/src/server/mod.rs b/src/server/mod.rs index d6e9f26b..c942ff91 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -106,7 +106,7 @@ //! let _ = sys.run(); //!} //! ``` -use std::net::Shutdown; +use std::net::{Shutdown, SocketAddr}; use std::rc::Rc; use std::{io, time}; @@ -143,10 +143,13 @@ pub use self::message::Request; pub use self::ssl::*; pub use self::error::{AcceptorError, HttpDispatchError}; -pub use self::settings::{ServerSettings, ServiceConfig, ServiceConfigBuilder}; +pub use self::settings::ServerSettings; #[doc(hidden)] -pub use self::service::{HttpService, StreamConfiguration}; +pub use self::settings::{ServiceConfig, ServiceConfigBuilder}; + +#[doc(hidden)] +pub use self::service::{H1Service, HttpService, StreamConfiguration}; #[doc(hidden)] pub use self::helpers::write_content_length; @@ -266,6 +269,12 @@ pub trait Writer { pub trait IoStream: AsyncRead + AsyncWrite + 'static { fn shutdown(&mut self, how: Shutdown) -> io::Result<()>; + /// Returns the socket address of the remote peer of this TCP connection. + fn peer_addr(&self) -> Option { + None + } + + /// Sets the value of the TCP_NODELAY option on this socket. fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>; fn set_linger(&mut self, dur: Option) -> io::Result<()>; @@ -341,6 +350,11 @@ impl IoStream for TcpStream { TcpStream::shutdown(self, how) } + #[inline] + fn peer_addr(&self) -> Option { + TcpStream::peer_addr(self).ok() + } + #[inline] fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { TcpStream::set_nodelay(self, nodelay) diff --git a/src/server/service.rs b/src/server/service.rs index ec71a1f1..e3402e30 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -5,7 +5,7 @@ use actix_net::service::{NewService, Service}; use futures::future::{ok, FutureResult}; use futures::{Async, Poll}; -use super::channel::HttpChannel; +use super::channel::{H1Channel, HttpChannel}; use super::error::HttpDispatchError; use super::handler::HttpHandler; use super::settings::ServiceConfig; @@ -89,7 +89,90 @@ where } fn call(&mut self, req: Self::Request) -> Self::Future { - HttpChannel::new(self.settings.clone(), req, None) + HttpChannel::new(self.settings.clone(), req) + } +} + +/// `NewService` implementation for HTTP1 transport +pub struct H1Service +where + H: HttpHandler, + Io: IoStream, +{ + settings: ServiceConfig, + _t: PhantomData, +} + +impl H1Service +where + H: HttpHandler, + Io: IoStream, +{ + /// Create new `HttpService` instance. + pub fn new(settings: ServiceConfig) -> Self { + H1Service { + settings, + _t: PhantomData, + } + } +} + +impl NewService for H1Service +where + H: HttpHandler, + Io: IoStream, +{ + type Request = Io; + type Response = (); + type Error = HttpDispatchError; + type InitError = (); + type Service = H1ServiceHandler; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + ok(H1ServiceHandler::new(self.settings.clone())) + } +} + +/// `Service` implementation for HTTP1 transport +pub struct H1ServiceHandler +where + H: HttpHandler, + Io: IoStream, +{ + settings: ServiceConfig, + _t: PhantomData, +} + +impl H1ServiceHandler +where + H: HttpHandler, + Io: IoStream, +{ + fn new(settings: ServiceConfig) -> H1ServiceHandler { + H1ServiceHandler { + settings, + _t: PhantomData, + } + } +} + +impl Service for H1ServiceHandler +where + H: HttpHandler, + Io: IoStream, +{ + type Request = Io; + type Response = (); + type Error = HttpDispatchError; + type Future = H1Channel; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + H1Channel::new(self.settings.clone(), req) } } diff --git a/src/server/ssl/nativetls.rs b/src/server/ssl/nativetls.rs index e56b4521..a9797ffb 100644 --- a/src/server/ssl/nativetls.rs +++ b/src/server/ssl/nativetls.rs @@ -1,4 +1,4 @@ -use std::net::Shutdown; +use std::net::{Shutdown, SocketAddr}; use std::{io, time}; use actix_net::ssl::TlsStream; @@ -12,6 +12,11 @@ impl IoStream for TlsStream { Ok(()) } + #[inline] + fn peer_addr(&self) -> Option { + self.get_ref().get_ref().peer_addr() + } + #[inline] fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { self.get_mut().get_mut().set_nodelay(nodelay) diff --git a/src/server/ssl/openssl.rs b/src/server/ssl/openssl.rs index f9e0e177..9d370f8b 100644 --- a/src/server/ssl/openssl.rs +++ b/src/server/ssl/openssl.rs @@ -1,4 +1,4 @@ -use std::net::Shutdown; +use std::net::{Shutdown, SocketAddr}; use std::{io, time}; use actix_net::ssl; @@ -65,6 +65,11 @@ impl IoStream for SslStream { Ok(()) } + #[inline] + fn peer_addr(&self) -> Option { + self.get_ref().get_ref().peer_addr() + } + #[inline] fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { self.get_mut().get_mut().set_nodelay(nodelay) diff --git a/src/server/ssl/rustls.rs b/src/server/ssl/rustls.rs index df78d1dc..a53a53a9 100644 --- a/src/server/ssl/rustls.rs +++ b/src/server/ssl/rustls.rs @@ -1,4 +1,4 @@ -use std::net::Shutdown; +use std::net::{Shutdown, SocketAddr}; use std::{io, time}; use actix_net::ssl; //::RustlsAcceptor; @@ -65,6 +65,11 @@ impl IoStream for TlsStream { Ok(()) } + #[inline] + fn peer_addr(&self) -> Option { + self.get_ref().0.peer_addr() + } + #[inline] fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { self.get_mut().0.set_nodelay(nodelay) diff --git a/tests/test_custom_pipeline.rs b/tests/test_custom_pipeline.rs new file mode 100644 index 00000000..cf1eeb5b --- /dev/null +++ b/tests/test_custom_pipeline.rs @@ -0,0 +1,81 @@ +extern crate actix; +extern crate actix_net; +extern crate actix_web; + +use std::{thread, time}; + +use actix::System; +use actix_net::server::Server; +use actix_net::service::NewServiceExt; +use actix_web::server::{HttpService, KeepAlive, ServiceConfig, StreamConfiguration}; +use actix_web::{client, test, App, HttpRequest}; + +#[test] +fn test_custom_pipeline() { + let addr = test::TestServer::unused_addr(); + + thread::spawn(move || { + Server::new() + .bind("test", addr, move || { + let app = App::new() + .route("/", http::Method::GET, |_: HttpRequest| "OK") + .finish(); + let settings = ServiceConfig::build(app) + .keep_alive(KeepAlive::Disabled) + .client_timeout(1000) + .client_shutdown(1000) + .server_hostname("localhost") + .server_address(addr) + .finish(); + + StreamConfiguration::new() + .nodelay(true) + .tcp_keepalive(Some(time::Duration::from_secs(10))) + .and_then(HttpService::new(settings)) + }).unwrap() + .run(); + }); + + let mut sys = System::new("test"); + { + let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) + .finish() + .unwrap(); + let response = sys.block_on(req.send()).unwrap(); + assert!(response.status().is_success()); + } +} + +#[test] +fn test_h1() { + use actix_web::server::H1Service; + + let addr = test::TestServer::unused_addr(); + thread::spawn(move || { + Server::new() + .bind("test", addr, move || { + let app = App::new() + .route("/", http::Method::GET, |_: HttpRequest| "OK") + .finish(); + let settings = ServiceConfig::build(app) + .keep_alive(KeepAlive::Disabled) + .client_timeout(1000) + .client_shutdown(1000) + .server_hostname("localhost") + .server_address(addr) + .finish(); + + H1Service::new(settings) + }).unwrap() + .run(); + }); + + let mut sys = System::new("test"); + { + let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) + .finish() + .unwrap(); + let response = sys.block_on(req.send()).unwrap(); + assert!(response.status().is_success()); + } +} diff --git a/tests/test_server.rs b/tests/test_server.rs index 8d9a400d..477d3e64 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -26,7 +26,6 @@ use std::io::{Read, Write}; use std::sync::Arc; use std::{thread, time}; -use actix_net::server::Server; #[cfg(feature = "brotli")] use brotli2::write::{BrotliDecoder, BrotliEncoder}; use bytes::{Bytes, BytesMut}; @@ -1223,48 +1222,6 @@ fn test_server_cookies() { } } -#[test] -fn test_custom_pipeline() { - use actix::System; - use actix_net::service::NewServiceExt; - use actix_web::server::{ - HttpService, KeepAlive, ServiceConfig, StreamConfiguration, - }; - - let addr = test::TestServer::unused_addr(); - - thread::spawn(move || { - Server::new() - .bind("test", addr, move || { - let app = App::new() - .route("/", http::Method::GET, |_: HttpRequest| "OK") - .finish(); - let settings = ServiceConfig::build(app) - .keep_alive(KeepAlive::Disabled) - .client_timeout(1000) - .client_shutdown(1000) - .server_hostname("localhost") - .server_address(addr) - .finish(); - - StreamConfiguration::new() - .nodelay(true) - .tcp_keepalive(Some(time::Duration::from_secs(10))) - .and_then(HttpService::new(settings)) - }).unwrap() - .run(); - }); - - let mut sys = System::new("test"); - { - let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) - .finish() - .unwrap(); - let response = sys.block_on(req.send()).unwrap(); - assert!(response.status().is_success()); - } -} - #[test] fn test_slow_request() { use actix::System;