From b71ddf7b4cceb24041d29ee60d399580fb36b553 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 7 Dec 2017 21:52:46 -0800 Subject: [PATCH] pass local addr to channel; use bitflags --- Cargo.toml | 5 +-- src/channel.rs | 18 +++++--- src/h1.rs | 97 ++++++++++++++++++++++++++------------------ src/h1writer.rs | 41 +++++++++++-------- src/h2.rs | 73 ++++++++++++++++++++------------- src/h2writer.rs | 37 +++++++++-------- src/lib.rs | 3 +- src/server.rs | 40 +++++++++++------- tests/test_server.rs | 2 +- 9 files changed, 190 insertions(+), 126 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2ff98b27f..7af676c5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,6 @@ httparse = "0.1" http-range = "0.1" mime = "0.3" mime_guess = "1.8" -cookie = { version="0.10", features=["percent-encode", "secure"] } regex = "0.2" sha1 = "0.2" url = "1.5" @@ -53,8 +52,8 @@ flate2 = "0.2" brotli2 = "^0.3.2" percent-encoding = "1.0" smallvec = "0.6" - -# redis-async = { git="https://github.com/benashford/redis-async-rs" } +bitflags = "1.0" +cookie = { version="0.10", features=["percent-encode", "secure"] } # tokio bytes = "0.4" diff --git a/src/channel.rs b/src/channel.rs index ac9875a47..3a253862f 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -51,16 +51,21 @@ pub struct HttpChannel impl HttpChannel where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { - pub fn new(stream: T, addr: Option, router: Rc>, http2: bool) - -> HttpChannel { + pub fn new(stream: T, + local: SocketAddr, + secure: bool, + peer: Option, + router: Rc>, + http2: bool) -> HttpChannel + { if http2 { HttpChannel { proto: Some(HttpProtocol::H2( - h2::Http2::new(stream, addr, router, Bytes::new()))) } + h2::Http2::new(stream, local, secure, peer, router, Bytes::new()))) } } else { HttpChannel { proto: Some(HttpProtocol::H1( - h1::Http1::new(stream, addr, router))) } + h1::Http1::new(stream, local, secure, peer, router))) } } } } @@ -105,8 +110,9 @@ impl Future for HttpChannel let proto = self.proto.take().unwrap(); match proto { HttpProtocol::H1(h1) => { - let (stream, addr, router, buf) = h1.into_inner(); - self.proto = Some(HttpProtocol::H2(h2::Http2::new(stream, addr, router, buf))); + let (stream, local, secure, addr, router, buf) = h1.into_inner(); + self.proto = Some(HttpProtocol::H2( + h2::Http2::new(stream, local, secure, addr, router, buf))); self.poll() } _ => unreachable!() diff --git a/src/h1.rs b/src/h1.rs index b7fed38c7..54217610b 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -29,6 +29,24 @@ const MAX_HEADERS: usize = 100; const MAX_PIPELINED_MESSAGES: usize = 16; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; +bitflags! { + struct Flags: u8 { + const SECURE = 0b0000_0001; + const ERROR = 0b0000_0010; + const KEEPALIVE = 0b0000_0100; + const H2 = 0b0000_1000; + } +} + +bitflags! { + struct EntryFlags: u8 { + const EOF = 0b0000_0001; + const ERROR = 0b0000_0010; + const FINISHED = 0b0000_0100; + } +} + + pub(crate) enum Http1Result { Done, Switch, @@ -41,44 +59,44 @@ enum Item { } pub(crate) struct Http1 { + flags: Flags, router: Rc>, + local: SocketAddr, addr: Option, stream: H1Writer, reader: Reader, read_buf: BytesMut, - error: bool, tasks: VecDeque, - keepalive: bool, keepalive_timer: Option, - h2: bool, } struct Entry { pipe: Pipeline, - eof: bool, - error: bool, - finished: bool, + flags: EntryFlags, } impl Http1 where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { - pub fn new(stream: T, addr: Option, router: Rc>) -> Self { + pub fn new(stream: T, local: SocketAddr, secure: bool, + addr: Option, router: Rc>) -> Self { Http1{ router: router, + local: local, + flags: if secure { Flags::SECURE | Flags::KEEPALIVE } else { Flags::KEEPALIVE }, addr: addr, stream: H1Writer::new(stream), reader: Reader::new(), read_buf: BytesMut::new(), - error: false, tasks: VecDeque::new(), - keepalive: true, - keepalive_timer: None, - h2: false } + keepalive_timer: None } } - pub fn into_inner(mut self) -> (T, Option, Rc>, Bytes) { - (self.stream.unwrap(), self.addr, self.router, self.read_buf.freeze()) + pub fn into_inner(mut self) -> (T, SocketAddr, bool, + Option, Rc>, Bytes) { + (self.stream.unwrap(), self.local, + self.flags.contains(Flags::SECURE), + self.addr, self.router, self.read_buf.freeze()) } pub fn poll(&mut self) -> Poll { @@ -103,8 +121,8 @@ impl Http1 while idx < self.tasks.len() { let item = &mut self.tasks[idx]; - if !io && !item.eof { - if item.error { + if !io && !item.flags.contains(EntryFlags::EOF) { + if item.flags.contains(EntryFlags::ERROR) { return Err(()) } @@ -113,14 +131,16 @@ impl Http1 not_ready = false; // overide keep-alive state - if self.keepalive { - self.keepalive = self.stream.keepalive(); + if self.stream.keepalive() { + self.flags.insert(Flags::KEEPALIVE); + } else { + self.flags.remove(Flags::KEEPALIVE); } self.stream = H1Writer::new(self.stream.unwrap()); - item.eof = true; + item.flags.insert(EntryFlags::EOF); if ready { - item.finished = true; + item.flags.insert(EntryFlags::FINISHED); } }, Ok(Async::NotReady) => { @@ -134,15 +154,15 @@ impl Http1 return Err(()) } } - } else if !item.finished { + } else if !item.flags.contains(EntryFlags::FINISHED) { match item.pipe.poll() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => { not_ready = false; - item.finished = true; + item.flags.insert(EntryFlags::FINISHED); }, Err(err) => { - item.error = true; + item.flags.insert(EntryFlags::ERROR); error!("Unhandled error: {}", err); } } @@ -152,7 +172,9 @@ impl Http1 // cleanup finished tasks while !self.tasks.is_empty() { - if self.tasks[0].eof && self.tasks[0].finished { + if self.tasks[0].flags.contains(EntryFlags::EOF) && + self.tasks[0].flags.contains(EntryFlags::FINISHED) + { self.tasks.pop_front(); } else { break @@ -160,8 +182,8 @@ impl Http1 } // no keep-alive - if !self.keepalive && self.tasks.is_empty() { - if self.h2 { + if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() { + if self.flags.contains(Flags::H2) { return Ok(Async::Ready(Http1Result::Switch)) } else { return Ok(Async::Ready(Http1Result::Done)) @@ -169,7 +191,8 @@ impl Http1 } // read incoming data - while !self.error && !self.h2 && self.tasks.len() < MAX_PIPELINED_MESSAGES { + while !self.flags.contains(Flags::ERROR) && !self.flags.contains(Flags::H2) && + self.tasks.len() < MAX_PIPELINED_MESSAGES { match self.reader.parse(self.stream.get_mut(), &mut self.read_buf) { Ok(Async::Ready(Item::Http1(mut req))) => { not_ready = false; @@ -194,16 +217,14 @@ impl Http1 self.tasks.push_back( Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)), - eof: false, - error: false, - finished: false}); + flags: EntryFlags::empty()}); } Ok(Async::Ready(Item::Http2)) => { - self.h2 = true; + self.flags.insert(Flags::H2); } Err(ReaderError::Disconnect) => { not_ready = false; - self.error = true; + self.flags.insert(Flags::ERROR); self.stream.disconnected(); for entry in &mut self.tasks { entry.pipe.disconnected() @@ -218,26 +239,24 @@ impl Http1 } // kill keepalive - self.keepalive = false; + self.flags.remove(Flags::KEEPALIVE); self.keepalive_timer.take(); // on parse error, stop reading stream but tasks need to be completed - self.error = true; + self.flags.insert(Flags::ERROR); if self.tasks.is_empty() { if let ReaderError::Error(err) = err { self.tasks.push_back( Entry {pipe: Pipeline::error(err.error_response()), - eof: false, - error: false, - finished: false}); + flags: EntryFlags::empty()}); } } } Ok(Async::NotReady) => { // start keep-alive timer, this is also slow request timeout if self.tasks.is_empty() { - if self.keepalive { + if self.flags.contains(Flags::KEEPALIVE) { if self.keepalive_timer.is_none() { trace!("Start keep-alive timer"); let mut timeout = Timeout::new( @@ -259,10 +278,10 @@ impl Http1 // check for parse error if self.tasks.is_empty() { - if self.h2 { + if self.flags.contains(Flags::H2) { return Ok(Async::Ready(Http1Result::Switch)) } - if self.error || self.keepalive_timer.is_none() { + if self.flags.contains(Flags::ERROR) || self.keepalive_timer.is_none() { return Ok(Async::Ready(Http1Result::Done)) } } diff --git a/src/h1writer.rs b/src/h1writer.rs index 447758cf6..8776b4f4f 100644 --- a/src/h1writer.rs +++ b/src/h1writer.rs @@ -35,28 +35,30 @@ pub(crate) trait Writer { fn poll_complete(&mut self) -> Poll<(), io::Error>; } +bitflags! { + struct Flags: u8 { + const STARTED = 0b0000_0001; + const UPGRADE = 0b0000_0010; + const KEEPALIVE = 0b0000_0100; + const DISCONNECTED = 0b0000_1000; + } +} pub(crate) struct H1Writer { + flags: Flags, stream: Option, - started: bool, encoder: PayloadEncoder, - upgrade: bool, - keepalive: bool, - disconnected: bool, written: u64, - headers_size: u64, + headers_size: u32, } impl H1Writer { pub fn new(stream: T) -> H1Writer { H1Writer { + flags: Flags::empty(), stream: Some(stream), - started: false, encoder: PayloadEncoder::default(), - upgrade: false, - keepalive: false, - disconnected: false, written: 0, headers_size: 0, } @@ -75,7 +77,7 @@ impl H1Writer { } pub fn keepalive(&self) -> bool { - self.keepalive && !self.upgrade + self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) } fn write_to_stream(&mut self) -> Result { @@ -105,9 +107,10 @@ impl H1Writer { impl Writer for H1Writer { + #[cfg_attr(feature = "cargo-clippy", allow(cast_lossless))] fn written(&self) -> u64 { - if self.written > self.headers_size { - self.written - self.headers_size + if self.written > self.headers_size as u64 { + self.written - self.headers_size as u64 } else { 0 } @@ -119,9 +122,11 @@ impl Writer for H1Writer { trace!("Prepare response with status: {:?}", msg.status()); // prepare task - self.started = true; + self.flags.insert(Flags::STARTED); self.encoder = PayloadEncoder::new(req, msg); - self.keepalive = msg.keep_alive().unwrap_or_else(|| req.keep_alive()); + if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) { + self.flags.insert(Flags::KEEPALIVE); + } // Connection upgrade let version = msg.version().unwrap_or_else(|| req.version()); @@ -129,7 +134,7 @@ impl Writer for H1Writer { msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("upgrade")); } // keep-alive - else if self.keepalive { + else if self.flags.contains(Flags::KEEPALIVE) { if version < Version::HTTP_11 { msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("keep-alive")); } @@ -177,7 +182,7 @@ impl Writer for H1Writer { // msg eof buffer.extend(b"\r\n"); - self.headers_size = buffer.len() as u64; + self.headers_size = buffer.len() as u32; } trace!("Response: {:?}", msg); @@ -193,8 +198,8 @@ impl Writer for H1Writer { } fn write(&mut self, payload: &[u8]) -> Result { - if !self.disconnected { - if self.started { + if !self.flags.contains(Flags::DISCONNECTED) { + if self.flags.contains(Flags::STARTED) { // TODO: add warning, write after EOF self.encoder.write(payload)?; } else { diff --git a/src/h2.rs b/src/h2.rs index cf89a719c..74f033ff8 100644 --- a/src/h2.rs +++ b/src/h2.rs @@ -25,13 +25,22 @@ use payload::{Payload, PayloadWriter}; const KEEPALIVE_PERIOD: u64 = 15; // seconds +bitflags! { + struct Flags: u8 { + const SECURE = 0b0000_0001; + const DISCONNECTED = 0b0000_0010; + } +} + +/// HTTP/2 Transport pub(crate) struct Http2 where T: AsyncRead + AsyncWrite + 'static, H: 'static { + flags: Flags, router: Rc>, + local: SocketAddr, addr: Option, state: State>, - disconnected: bool, tasks: VecDeque, keepalive_timer: Option, } @@ -46,10 +55,12 @@ impl Http2 where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { - pub fn new(stream: T, addr: Option, router: Rc>, buf: Bytes) -> Self { - Http2{ router: router, + pub fn new(stream: T, local: SocketAddr, secure: bool, + addr: Option, router: Rc>, buf: Bytes) -> Self { + Http2{ flags: if secure { Flags::SECURE } else { Flags::empty() }, + router: router, + local: local, addr: addr, - disconnected: false, tasks: VecDeque::new(), state: State::Handshake( Server::handshake(IoWrapper{unread: Some(buf), inner: stream})), @@ -80,33 +91,33 @@ impl Http2 // read payload item.poll_payload(); - if !item.eof { + if !item.flags.contains(EntryFlags::EOF) { match item.task.poll_io(&mut item.stream) { Ok(Async::Ready(ready)) => { - item.eof = true; + item.flags.insert(EntryFlags::EOF); if ready { - item.finished = true; + item.flags.insert(EntryFlags::FINISHED); } not_ready = false; }, Ok(Async::NotReady) => (), Err(err) => { error!("Unhandled error: {}", err); - item.eof = true; - item.error = true; + item.flags.insert(EntryFlags::EOF); + item.flags.insert(EntryFlags::ERROR); item.stream.reset(Reason::INTERNAL_ERROR); } } - } else if !item.finished { + } else if !item.flags.contains(EntryFlags::FINISHED) { match item.task.poll() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => { not_ready = false; - item.finished = true; + item.flags.insert(EntryFlags::FINISHED); }, Err(err) => { - item.error = true; - item.finished = true; + item.flags.insert(EntryFlags::ERROR); + item.flags.insert(EntryFlags::FINISHED); error!("Unhandled error: {}", err); } } @@ -115,7 +126,10 @@ impl Http2 // cleanup finished tasks while !self.tasks.is_empty() { - if self.tasks[0].eof && self.tasks[0].finished || self.tasks[0].error { + if self.tasks[0].flags.contains(EntryFlags::EOF) && + self.tasks[0].flags.contains(EntryFlags::FINISHED) || + self.tasks[0].flags.contains(EntryFlags::ERROR) + { self.tasks.pop_front(); } else { break @@ -123,11 +137,11 @@ impl Http2 } // get request - if !self.disconnected { + if !self.flags.contains(Flags::DISCONNECTED) { match server.poll() { Ok(Async::Ready(None)) => { not_ready = false; - self.disconnected = true; + self.flags.insert(Flags::DISCONNECTED); for entry in &mut self.tasks { entry.task.disconnected() } @@ -156,7 +170,7 @@ impl Http2 } Err(err) => { trace!("Connection error: {}", err); - self.disconnected = true; + self.flags.insert(Flags::DISCONNECTED); for entry in &mut self.tasks { entry.task.disconnected() } @@ -166,7 +180,7 @@ impl Http2 } if not_ready { - if self.tasks.is_empty() && self.disconnected { + if self.tasks.is_empty() && self.flags.contains(Flags::DISCONNECTED) { return Ok(Async::Ready(())) } else { return Ok(Async::NotReady) @@ -196,16 +210,22 @@ impl Http2 } } +bitflags! { + struct EntryFlags: u8 { + const EOF = 0b0000_0001; + const REOF = 0b0000_0010; + const ERROR = 0b0000_0100; + const FINISHED = 0b0000_1000; + } +} + struct Entry { task: Pipeline, payload: PayloadType, recv: RecvStream, stream: H2Writer, - eof: bool, - error: bool, - finished: bool, - reof: bool, capacity: usize, + flags: EntryFlags, } impl Entry { @@ -244,22 +264,19 @@ impl Entry { payload: psender, recv: recv, stream: H2Writer::new(resp), - eof: false, - error: false, - finished: false, - reof: false, + flags: EntryFlags::empty(), capacity: 0, } } fn poll_payload(&mut self) { - if !self.reof { + if !self.flags.contains(EntryFlags::REOF) { match self.recv.poll() { Ok(Async::Ready(Some(chunk))) => { self.payload.feed_data(chunk); }, Ok(Async::Ready(None)) => { - self.reof = true; + self.flags.insert(EntryFlags::REOF); }, Ok(Async::NotReady) => (), Err(err) => { diff --git a/src/h2writer.rs b/src/h2writer.rs index 82a1b96e0..062c69e4e 100644 --- a/src/h2writer.rs +++ b/src/h2writer.rs @@ -16,14 +16,19 @@ use h1writer::{Writer, WriterState}; const CHUNK_SIZE: usize = 16_384; const MAX_WRITE_BUFFER_SIZE: usize = 65_536; // max buffer size 64k +bitflags! { + struct Flags: u8 { + const STARTED = 0b0000_0001; + const DISCONNECTED = 0b0000_0010; + const EOF = 0b0000_0100; + } +} pub(crate) struct H2Writer { respond: Respond, stream: Option>, - started: bool, encoder: PayloadEncoder, - disconnected: bool, - eof: bool, + flags: Flags, written: u64, } @@ -33,10 +38,8 @@ impl H2Writer { H2Writer { respond: respond, stream: None, - started: false, encoder: PayloadEncoder::default(), - disconnected: false, - eof: true, + flags: Flags::empty(), written: 0, } } @@ -48,7 +51,7 @@ impl H2Writer { } fn write_to_stream(&mut self) -> Result { - if !self.started { + if !self.flags.contains(Flags::STARTED) { return Ok(WriterState::Done) } @@ -56,7 +59,7 @@ impl H2Writer { let buffer = self.encoder.get_mut(); if buffer.is_empty() { - if self.eof { + if self.flags.contains(Flags::EOF) { let _ = stream.send_data(Bytes::new(), true); } return Ok(WriterState::Done) @@ -77,7 +80,7 @@ impl H2Writer { Ok(Async::Ready(Some(cap))) => { let len = buffer.len(); let bytes = buffer.split_to(cmp::min(cap, len)); - let eof = buffer.is_empty() && self.eof; + let eof = buffer.is_empty() && self.flags.contains(Flags::EOF); self.written += bytes.len() as u64; if let Err(err) = stream.send_data(bytes.freeze(), eof) { @@ -111,9 +114,11 @@ impl Writer for H2Writer { trace!("Prepare response with status: {:?}", msg.status()); // prepare response - self.started = true; + self.flags.insert(Flags::STARTED); self.encoder = PayloadEncoder::new(req, msg); - self.eof = if let Body::Empty = *msg.body() { true } else { false }; + if let Body::Empty = *msg.body() { + self.flags.insert(Flags::EOF); + } // http2 specific msg.headers_mut().remove(CONNECTION); @@ -140,7 +145,7 @@ impl Writer for H2Writer { resp.headers_mut().insert(key, value.clone()); } - match self.respond.send_response(resp, self.eof) { + match self.respond.send_response(resp, self.flags.contains(Flags::EOF)) { Ok(stream) => self.stream = Some(stream), Err(_) => @@ -151,7 +156,7 @@ impl Writer for H2Writer { if msg.body().is_binary() { if let Body::Binary(bytes) = msg.replace_body(Body::Empty) { - self.eof = true; + self.flags.insert(Flags::EOF); self.encoder.write(bytes.as_ref())?; if let Some(ref mut stream) = self.stream { stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE)); @@ -164,8 +169,8 @@ impl Writer for H2Writer { } fn write(&mut self, payload: &[u8]) -> Result { - if !self.disconnected { - if self.started { + if !self.flags.contains(Flags::DISCONNECTED) { + if self.flags.contains(Flags::STARTED) { // TODO: add warning, write after EOF self.encoder.write(payload)?; } else { @@ -184,7 +189,7 @@ impl Writer for H2Writer { fn write_eof(&mut self) -> Result { self.encoder.write_eof()?; - self.eof = true; + self.flags.insert(Flags::EOF); if !self.encoder.is_eof() { Err(io::Error::new(io::ErrorKind::Other, "Last payload item, but eof is not reached")) diff --git a/src/lib.rs b/src/lib.rs index b772ec9fb..a2d9d6830 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,8 @@ extern crate bytes; extern crate sha1; extern crate regex; #[macro_use] +extern crate bitflags; +#[macro_use] extern crate futures; extern crate tokio_io; extern crate tokio_core; @@ -61,7 +63,6 @@ mod route; mod router; mod param; mod resource; -// mod recognizer; mod handler; mod pipeline; mod server; diff --git a/src/server.rs b/src/server.rs index 0a58449ff..635147b51 100644 --- a/src/server.rs +++ b/src/server.rs @@ -26,7 +26,6 @@ use tokio_openssl::{SslStream, SslAcceptorExt}; use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; - /// An HTTP Server /// /// `T` - async stream, anything that implements `AsyncRead` + `AsyncWrite`. @@ -64,12 +63,15 @@ impl HttpServer H: HttpHandler, { /// Start listening for incomming connections from stream. - pub fn serve_incoming(self, stream: S) -> io::Result + pub fn serve_incoming(self, stream: S, secure: bool) -> io::Result where Self: ActorAddress, S: Stream + 'static { Ok(HttpServer::create(move |ctx| { - ctx.add_stream(stream.map(|(t, _)| IoStream(t, None, false))); + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + ctx.add_stream(stream.map( + move |(t, _)| IoStream{io: t, srv: addr, + peer: None, http2: false, secure: secure})); self })) } @@ -114,7 +116,9 @@ impl HttpServer { Ok(HttpServer::create(move |ctx| { for (addr, tcp) in addrs { info!("Starting http server on {}", addr); - ctx.add_stream(tcp.incoming().map(|(t, a)| IoStream(t, Some(a), false))); + ctx.add_stream(tcp.incoming().map( + move |(t, a)| IoStream{io: t, srv: addr, + peer: Some(a), http2: false, secure: false})); } self })) @@ -144,15 +148,15 @@ impl HttpServer, net::SocketAddr, H> { }; Ok(HttpServer::create(move |ctx| { - for (addr, tcp) in addrs { - info!("Starting tls http server on {}", addr); + for (srv, tcp) in addrs { + info!("Starting tls http server on {}", srv); let acc = acceptor.clone(); ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| { TlsAcceptorExt::accept_async(acc.as_ref(), stream) - .map(move |t| { - IoStream(t, Some(addr), false) - }) + .map(move |t| + IoStream{io: t, srv: srv.clone(), + peer: Some(addr), http2: false, secure: true}) .map_err(|err| { trace!("Error during handling tls connection: {}", err); io::Error::new(io::ErrorKind::Other, err) @@ -191,8 +195,8 @@ impl HttpServer, net::SocketAddr, H> { }; Ok(HttpServer::create(move |ctx| { - for (addr, tcp) in addrs { - info!("Starting tls http server on {}", addr); + for (srv, tcp) in addrs { + info!("Starting tls http server on {}", srv); let acc = acceptor.clone(); ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| { @@ -205,7 +209,8 @@ impl HttpServer, net::SocketAddr, H> { } else { false }; - IoStream(stream, Some(addr), http2) + IoStream{io: stream, srv: srv.clone(), + peer: Some(addr), http2: http2, secure: true} }) .map_err(|err| { trace!("Error during handling tls connection: {}", err); @@ -218,7 +223,13 @@ impl HttpServer, net::SocketAddr, H> { } } -struct IoStream(T, Option, bool); +struct IoStream { + io: T, + srv: SocketAddr, + peer: Option, + http2: bool, + secure: bool, +} impl ResponseType for IoStream where T: AsyncRead + AsyncWrite + 'static @@ -245,7 +256,8 @@ impl Handler, io::Error> for HttpServer -> Response> { Arbiter::handle().spawn( - HttpChannel::new(msg.0, msg.1, Rc::clone(&self.h), msg.2)); + HttpChannel::new(msg.io, msg.srv, msg.secure, + msg.peer, Rc::clone(&self.h), msg.http2)); Self::empty() } } diff --git a/tests/test_server.rs b/tests/test_server.rs index 7704c3e35..b1715f252 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -39,7 +39,7 @@ fn test_serve_incoming() { Application::new("/") .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))); let tcp = TcpListener::from_listener(tcp, &addr2, Arbiter::handle()).unwrap(); - srv.serve_incoming::<_, ()>(tcp.incoming()).unwrap(); + srv.serve_incoming::<_, ()>(tcp.incoming(), false).unwrap(); sys.run(); });