From a116c4c2c799e24f21bd57edde03d38ae64abea7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 16 Apr 2019 09:54:02 -0700 Subject: [PATCH] Expose peer addr via Request::peer_addr() and RequestHead::peer_addr --- Cargo.toml | 8 ++--- actix-http/CHANGES.md | 6 +++- actix-http/Cargo.toml | 14 ++++---- actix-http/src/h1/codec.rs | 9 ++--- actix-http/src/h1/dispatcher.rs | 20 ++++++----- actix-http/src/h1/service.rs | 20 +++++------ actix-http/src/h2/dispatcher.rs | 19 +++++----- actix-http/src/h2/service.rs | 58 +++++++++++++++++------------- actix-http/src/message.rs | 3 ++ actix-http/src/request.rs | 12 ++++++- actix-http/src/service.rs | 64 ++++++++++++++++++++++++++------- actix-http/src/test.rs | 15 ++++++++ actix-http/tests/test_server.rs | 7 +++- 13 files changed, 170 insertions(+), 85 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2835d6c3..68979d096 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,8 +72,8 @@ actix-router = "0.1.2" actix-rt = "0.2.2" actix-web-codegen = "0.1.0-alpha.6" actix-http = { version = "0.1.0-alpha.5", features=["fail"] } -actix-server = "0.4.2" -actix-server-config = "0.1.0" +actix-server = "0.4.3" +actix-server-config = "0.1.1" actix-threadpool = "0.1.0" awc = { version = "0.1.0-alpha.6", optional = true } @@ -81,7 +81,7 @@ bytes = "0.4" derive_more = "0.14" encoding = "0.2" futures = "0.1" -hashbrown = "0.2.1" +hashbrown = "0.2.2" log = "0.4" mime = "0.3" net2 = "0.2.33" @@ -121,4 +121,4 @@ actix-web-codegen = { path = "actix-web-codegen" } actix-web-actors = { path = "actix-web-actors" } actix-session = { path = "actix-session" } actix-files = { path = "actix-files" } -awc = { path = "awc" } \ No newline at end of file +awc = { path = "awc" } diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 236436bb1..dc56b0401 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,10 @@ # Changes -## [0.1.0] - 2019-04-xx +## [0.1.0] - 2019-04-16 + +### Added + +* Expose peer addr via `Request::peer_addr()` and `RequestHead::peer_addr` ### Changed diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index f7ca0bce1..746699a07 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http" -version = "0.1.0-alpha.5" +version = "0.1.0" authors = ["Nikolay Kim "] description = "Actix http primitives" readme = "README.md" @@ -26,7 +26,7 @@ path = "src/lib.rs" default = [] # openssl -ssl = ["openssl", "actix-connect/ssl"] +ssl = ["openssl", "actix-connect/ssl", "actix-server-config/ssl"] # brotli encoding, requires c compiler brotli = ["brotli2"] @@ -48,7 +48,7 @@ actix-service = "0.3.6" actix-codec = "0.1.2" actix-connect = "0.1.4" actix-utils = "0.3.5" -actix-server-config = "0.1.0" +actix-server-config = "0.1.1" actix-threadpool = "0.1.0" base64 = "0.10" @@ -60,7 +60,7 @@ derive_more = "0.14" either = "1.5.2" encoding = "0.2" futures = "0.1" -hashbrown = "0.2.0" +hashbrown = "0.2.2" h2 = "0.1.16" http = "0.1.17" httparse = "1.3" @@ -76,10 +76,10 @@ serde = "1.0" serde_json = "1.0" sha1 = "0.6" slab = "0.4" -serde_urlencoded = "0.5.3" +serde_urlencoded = "0.5.5" time = "0.1" tokio-tcp = "0.1.3" -tokio-timer = "0.2" +tokio-timer = "0.2.8" tokio-current-thread = "0.1" trust-dns-resolver = { version="0.11.0", default-features = false } @@ -96,7 +96,7 @@ openssl = { version="0.10", optional = true } [dev-dependencies] actix-rt = "0.2.2" -actix-server = { version = "0.4.1", features=["ssl"] } +actix-server = { version = "0.4.3", features=["ssl"] } actix-connect = { version = "0.1.4", features=["ssl"] } actix-http-test = { version = "0.1.0-alpha.3", features=["ssl"] } env_logger = "0.6" diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index 1f3983adb..1e1e1602f 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -1,6 +1,6 @@ #![allow(unused_imports, unused_variables, dead_code)] -use std::fmt; -use std::io::{self, Write}; +use std::io::Write; +use std::{fmt, io, net}; use actix_codec::{Decoder, Encoder}; use bitflags::bitflags; @@ -40,7 +40,6 @@ pub struct Codec { // encoder part flags: Flags, encoder: encoder::MessageEncoder>, - // headers_size: u32, } impl Default for Codec { @@ -67,13 +66,11 @@ impl Codec { }; Codec { config, + flags, decoder: decoder::MessageDecoder::default(), payload: None, version: Version::HTTP_11, ctype: ConnectionType::Close, - - flags, - // headers_size: 0, encoder: encoder::MessageEncoder::default(), } } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index cf39b8232..758466837 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -1,8 +1,9 @@ use std::collections::VecDeque; use std::time::Instant; -use std::{fmt, io}; +use std::{fmt, io, net}; -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; +use actix_codec::{Decoder, Encoder, Framed, FramedParts}; +use actix_server_config::IoStream; use actix_service::Service; use actix_utils::cloneable::CloneableService; use bitflags::bitflags; @@ -81,6 +82,7 @@ where expect: CloneableService, upgrade: Option>, flags: Flags, + peer_addr: Option, error: Option, state: State, @@ -161,7 +163,7 @@ impl PartialEq for PollResponse { impl Dispatcher where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Response: Into>, @@ -220,14 +222,15 @@ where Dispatcher { inner: DispatcherState::Normal(InnerDispatcher { - io, - codec, - read_buf, write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), payload: None, state: State::None, error: None, + peer_addr: io.peer_addr(), messages: VecDeque::new(), + io, + codec, + read_buf, service, expect, upgrade, @@ -241,7 +244,7 @@ where impl InnerDispatcher where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Response: Into>, @@ -490,6 +493,7 @@ where match msg { Message::Item(mut req) => { let pl = self.codec.message_type(); + req.head_mut().peer_addr = self.peer_addr; if pl == MessageType::Stream && self.upgrade.is_some() { self.messages.push_back(DispatcherMessage::Upgrade(req)); @@ -649,7 +653,7 @@ where impl Future for Dispatcher where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Response: Into>, diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index f92fd0c89..ecf6c8b93 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -1,8 +1,8 @@ use std::fmt; use std::marker::PhantomData; -use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_server_config::{Io, ServerConfig as SrvConfig}; +use actix_codec::Framed; +use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig}; use actix_service::{IntoNewService, NewService, Service}; use actix_utils::cloneable::CloneableService; use futures::future::{ok, FutureResult}; @@ -104,7 +104,7 @@ where impl NewService for H1Service where - T: AsyncRead + AsyncWrite, + T: IoStream, S: NewService, S::Error: Into, S::Response: Into>, @@ -161,7 +161,7 @@ where impl Future for H1ServiceResponse where - T: AsyncRead + AsyncWrite, + T: IoStream, S: NewService, S::Error: Into, S::Response: Into>, @@ -245,7 +245,7 @@ where impl Service for H1ServiceHandler where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Response: Into>, @@ -309,7 +309,7 @@ pub struct OneRequest { impl OneRequest where - T: AsyncRead + AsyncWrite, + T: IoStream, { /// Create new `H1SimpleService` instance. pub fn new() -> Self { @@ -322,7 +322,7 @@ where impl NewService for OneRequest where - T: AsyncRead + AsyncWrite, + T: IoStream, { type Request = Io; type Response = (Request, Framed); @@ -348,7 +348,7 @@ pub struct OneRequestService { impl Service for OneRequestService where - T: AsyncRead + AsyncWrite, + T: IoStream, { type Request = Io; type Response = (Request, Framed); @@ -372,14 +372,14 @@ where #[doc(hidden)] pub struct OneRequestServiceResponse where - T: AsyncRead + AsyncWrite, + T: IoStream, { framed: Option>, } impl Future for OneRequestServiceResponse where - T: AsyncRead + AsyncWrite, + T: IoStream, { type Item = (Request, Framed); type Error = ParseError; diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index de0b761f5..e66ff63c3 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -1,9 +1,10 @@ use std::collections::VecDeque; use std::marker::PhantomData; use std::time::Instant; -use std::{fmt, mem}; +use std::{fmt, mem, net}; use actix_codec::{AsyncRead, AsyncWrite}; +use actix_server_config::IoStream; use actix_service::Service; use actix_utils::cloneable::CloneableService; use bitflags::bitflags; @@ -29,14 +30,11 @@ use crate::response::Response; const CHUNK_SIZE: usize = 16_384; /// Dispatcher for HTTP/2 protocol -pub struct Dispatcher< - T: AsyncRead + AsyncWrite, - S: Service, - B: MessageBody, -> { +pub struct Dispatcher, B: MessageBody> { service: CloneableService, connection: Connection, config: ServiceConfig, + peer_addr: Option, ka_expire: Instant, ka_timer: Option, _t: PhantomData, @@ -44,7 +42,7 @@ pub struct Dispatcher< impl Dispatcher where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Future: 'static, @@ -56,6 +54,7 @@ where connection: Connection, config: ServiceConfig, timeout: Option, + peer_addr: Option, ) -> Self { // let keepalive = config.keep_alive_enabled(); // let flags = if keepalive { @@ -76,9 +75,10 @@ where Dispatcher { service, config, + peer_addr, + connection, ka_expire, ka_timer, - connection, _t: PhantomData, } } @@ -86,7 +86,7 @@ where impl Future for Dispatcher where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Future: 'static, @@ -117,6 +117,7 @@ where head.method = parts.method; head.version = parts.version; head.headers = parts.headers.into(); + head.peer_addr = self.peer_addr; tokio_current_thread::spawn(ServiceResponse:: { state: ServiceResponseState::ServiceCall( self.service.call(req), diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index 8ab244b50..42b8d8d82 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::{io, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_server_config::{Io, ServerConfig as SrvConfig}; +use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig}; use actix_service::{IntoNewService, NewService, Service}; use actix_utils::cloneable::CloneableService; use bytes::Bytes; @@ -63,7 +63,7 @@ where impl NewService for H2Service where - T: AsyncRead + AsyncWrite, + T: IoStream, S: NewService, S::Error: Into, S::Response: Into>, @@ -95,7 +95,7 @@ pub struct H2ServiceResponse, impl Future for H2ServiceResponse where - T: AsyncRead + AsyncWrite, + T: IoStream, S: NewService, S::Error: Into, S::Response: Into>, @@ -140,7 +140,7 @@ where impl Service for H2ServiceHandler where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Future: 'static, @@ -161,17 +161,20 @@ where } fn call(&mut self, req: Self::Request) -> Self::Future { + let io = req.into_parts().0; + let peer_addr = io.peer_addr(); H2ServiceHandlerResponse { state: State::Handshake( Some(self.srv.clone()), Some(self.cfg.clone()), - server::handshake(req.into_parts().0), + peer_addr, + server::handshake(io), ), } } } -enum State, B: MessageBody> +enum State, B: MessageBody> where S::Future: 'static, { @@ -179,13 +182,14 @@ where Handshake( Option>, Option, + Option, Handshake, ), } pub struct H2ServiceHandlerResponse where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Future: 'static, @@ -197,7 +201,7 @@ where impl Future for H2ServiceHandlerResponse where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Future: 'static, @@ -210,24 +214,28 @@ where fn poll(&mut self) -> Poll { match self.state { State::Incoming(ref mut disp) => disp.poll(), - State::Handshake(ref mut srv, ref mut config, ref mut handshake) => { - match handshake.poll() { - Ok(Async::Ready(conn)) => { - self.state = State::Incoming(Dispatcher::new( - srv.take().unwrap(), - conn, - config.take().unwrap(), - None, - )); - self.poll() - } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => { - trace!("H2 handshake error: {}", err); - Err(err.into()) - } + State::Handshake( + ref mut srv, + ref mut config, + ref peer_addr, + ref mut handshake, + ) => match handshake.poll() { + Ok(Async::Ready(conn)) => { + self.state = State::Incoming(Dispatcher::new( + srv.take().unwrap(), + conn, + config.take().unwrap(), + None, + peer_addr.clone(), + )); + self.poll() } - } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => { + trace!("H2 handshake error: {}", err); + Err(err.into()) + } + }, } } } diff --git a/actix-http/src/message.rs b/actix-http/src/message.rs index 61ca5161e..7f2dc603f 100644 --- a/actix-http/src/message.rs +++ b/actix-http/src/message.rs @@ -1,4 +1,5 @@ use std::cell::{Ref, RefCell, RefMut}; +use std::net; use std::rc::Rc; use bitflags::bitflags; @@ -43,6 +44,7 @@ pub struct RequestHead { pub version: Version, pub headers: HeaderMap, pub extensions: RefCell, + pub peer_addr: Option, flags: Flags, } @@ -54,6 +56,7 @@ impl Default for RequestHead { version: Version::HTTP_11, headers: HeaderMap::with_capacity(16), flags: Flags::empty(), + peer_addr: None, extensions: RefCell::new(Extensions::new()), } } diff --git a/actix-http/src/request.rs b/actix-http/src/request.rs index 468b4e337..5ba07929a 100644 --- a/actix-http/src/request.rs +++ b/actix-http/src/request.rs @@ -1,5 +1,5 @@ use std::cell::{Ref, RefMut}; -use std::fmt; +use std::{fmt, net}; use http::{header, Method, Uri, Version}; @@ -139,6 +139,7 @@ impl

Request

{ } /// Check if request requires connection upgrade + #[inline] pub fn upgrade(&self) -> bool { if let Some(conn) = self.head().headers.get(header::CONNECTION) { if let Ok(s) = conn.to_str() { @@ -147,6 +148,15 @@ impl

Request

{ } self.head().method == Method::CONNECT } + + /// Peer socket address + /// + /// Peer address is actual socket address, if proxy is used in front of + /// actix http server, then peer address would be address of this proxy. + #[inline] + pub fn peer_addr(&self) -> Option { + self.head().peer_addr + } } impl

fmt::Debug for Request

{ diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 2af1238b1..dd3af1db0 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -1,8 +1,10 @@ use std::marker::PhantomData; -use std::{fmt, io}; +use std::{fmt, io, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_server_config::{Io as ServerIo, Protocol, ServerConfig as SrvConfig}; +use actix_server_config::{ + Io as ServerIo, IoStream, Protocol, ServerConfig as SrvConfig, +}; use actix_service::{IntoNewService, NewService, Service}; use actix_utils::cloneable::CloneableService; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -128,7 +130,7 @@ where impl NewService for HttpService where - T: AsyncRead + AsyncWrite, + T: IoStream, S: NewService, S::Error: Into, S::InitError: fmt::Debug, @@ -182,7 +184,7 @@ pub struct HttpServiceResponse< impl Future for HttpServiceResponse where - T: AsyncRead + AsyncWrite, + T: IoStream, S: NewService, S::Error: Into, S::InitError: fmt::Debug, @@ -268,7 +270,7 @@ where impl Service for HttpServiceHandler where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Future: 'static, @@ -317,6 +319,7 @@ where let (io, _, proto) = req.into_parts(); match proto { Protocol::Http2 => { + let peer_addr = io.peer_addr(); let io = Io { inner: io, unread: None, @@ -326,6 +329,7 @@ where server::handshake(io), self.cfg.clone(), self.srv.clone(), + peer_addr, ))), } } @@ -357,7 +361,7 @@ where S: Service, S::Future: 'static, S::Error: Into, - T: AsyncRead + AsyncWrite, + T: IoStream, B: MessageBody, X: Service, X::Error: Into, @@ -376,12 +380,19 @@ where Option>, )>, ), - Handshake(Option<(Handshake, Bytes>, ServiceConfig, CloneableService)>), + Handshake( + Option<( + Handshake, Bytes>, + ServiceConfig, + CloneableService, + Option, + )>, + ), } pub struct HttpServiceHandlerResponse where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Future: 'static, @@ -399,7 +410,7 @@ const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; impl Future for HttpServiceHandlerResponse where - T: AsyncRead + AsyncWrite, + T: IoStream, S: Service, S::Error: Into, S::Future: 'static, @@ -437,12 +448,17 @@ where } let (io, buf, cfg, srv, expect, upgrade) = data.take().unwrap(); if buf[..14] == HTTP2_PREFACE[..] { + let peer_addr = io.peer_addr(); let io = Io { inner: io, unread: Some(buf), }; - self.state = - State::Handshake(Some((server::handshake(io), cfg, srv))); + self.state = State::Handshake(Some(( + server::handshake(io), + cfg, + srv, + peer_addr, + ))); } else { self.state = State::H1(h1::Dispatcher::with_timeout( io, @@ -470,8 +486,8 @@ where } else { panic!() }; - let (_, cfg, srv) = data.take().unwrap(); - self.state = State::H2(Dispatcher::new(srv, conn, cfg, None)); + let (_, cfg, srv, peer_addr) = data.take().unwrap(); + self.state = State::H2(Dispatcher::new(srv, conn, cfg, None, peer_addr)); self.poll() } } @@ -523,3 +539,25 @@ impl AsyncWrite for Io { self.inner.write_buf(buf) } } + +impl IoStream for Io { + #[inline] + fn peer_addr(&self) -> Option { + self.inner.peer_addr() + } + + #[inline] + fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { + self.inner.set_nodelay(nodelay) + } + + #[inline] + fn set_linger(&mut self, dur: Option) -> io::Result<()> { + self.inner.set_linger(dur) + } + + #[inline] + fn set_keepalive(&mut self, dur: Option) -> io::Result<()> { + self.inner.set_keepalive(dur) + } +} diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index ce55912f7..b4344a676 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -4,6 +4,7 @@ use std::io; use std::str::FromStr; use actix_codec::{AsyncRead, AsyncWrite}; +use actix_server_config::IoStream; use bytes::{Buf, Bytes, BytesMut}; use futures::{Async, Poll}; use http::header::{self, HeaderName, HeaderValue}; @@ -253,3 +254,17 @@ impl AsyncWrite for TestBuffer { Ok(Async::NotReady) } } + +impl IoStream for TestBuffer { + fn set_nodelay(&mut self, _nodelay: bool) -> io::Result<()> { + Ok(()) + } + + fn set_linger(&mut self, _dur: Option) -> io::Result<()> { + Ok(()) + } + + fn set_keepalive(&mut self, _dur: Option) -> io::Result<()> { + Ok(()) + } +} diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index e53ff0212..4b56e4b2c 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -35,7 +35,10 @@ fn test_h1() { .keep_alive(KeepAlive::Disabled) .client_timeout(1000) .client_disconnect(1000) - .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .h1(|req: Request| { + assert!(req.peer_addr().is_some()); + future::ok::<_, ()>(Response::Ok().finish()) + }) }); let response = srv.block_on(srv.get("/").send()).unwrap(); @@ -50,6 +53,7 @@ fn test_h1_2() { .client_timeout(1000) .client_disconnect(1000) .finish(|req: Request| { + assert!(req.peer_addr().is_some()); assert_eq!(req.version(), http::Version::HTTP_11); future::ok::<_, ()>(Response::Ok().finish()) }) @@ -115,6 +119,7 @@ fn test_h2_1() -> std::io::Result<()> { .and_then( HttpService::build() .finish(|req: Request| { + assert!(req.peer_addr().is_some()); assert_eq!(req.version(), http::Version::HTTP_2); future::ok::<_, Error>(Response::Ok().finish()) })