use std::{ptr, mem, time, io}; use std::rc::Rc; use std::net::{SocketAddr, Shutdown}; use bytes::{Bytes, Buf, BufMut}; use futures::{Future, Poll, Async}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::TcpStream; use super::{h1, h2, HttpHandler, IoStream}; use super::settings::WorkerSettings; enum HttpProtocol { H1(h1::Http1), H2(h2::Http2), } #[doc(hidden)] pub struct HttpChannel where T: IoStream, H: HttpHandler + 'static { proto: Option>, node: Option>>, } impl HttpChannel where T: IoStream, H: HttpHandler + 'static { pub(crate) fn new(settings: Rc>, io: T, peer: Option, http2: bool) -> HttpChannel { settings.add_channel(); if http2 { HttpChannel { node: None, proto: Some(HttpProtocol::H2( h2::Http2::new(settings, io, peer, Bytes::new()))) } } else { HttpChannel { node: None, proto: Some(HttpProtocol::H1(h1::Http1::new(settings, io, peer))) } } } fn shutdown(&mut self) { match self.proto { Some(HttpProtocol::H1(ref mut h1)) => { let io = h1.io(); let _ = IoStream::set_linger(io, Some(time::Duration::new(0, 0))); let _ = IoStream::shutdown(io, Shutdown::Both); } Some(HttpProtocol::H2(ref mut h2)) => { h2.shutdown() } _ => unreachable!(), } } } impl Future for HttpChannel where T: IoStream, H: HttpHandler + 'static { type Item = (); type Error = (); fn poll(&mut self) -> Poll { if self.node.is_none() { self.node = Some(Node::new(self)); match self.proto { Some(HttpProtocol::H1(ref mut h1)) => { h1.settings().head().insert(self.node.as_ref().unwrap()); } Some(HttpProtocol::H2(ref mut h2)) => { h2.settings().head().insert(self.node.as_ref().unwrap()); } _ => unreachable!(), } } match self.proto { Some(HttpProtocol::H1(ref mut h1)) => { match h1.poll() { Ok(Async::Ready(h1::Http1Result::Done)) => { h1.settings().remove_channel(); self.node.as_ref().unwrap().remove(); return Ok(Async::Ready(())) } Ok(Async::Ready(h1::Http1Result::Switch)) => (), Ok(Async::NotReady) => return Ok(Async::NotReady), Err(_) => { h1.settings().remove_channel(); self.node.as_ref().unwrap().remove(); return Err(()) } } } Some(HttpProtocol::H2(ref mut h2)) => { let result = h2.poll(); match result { Ok(Async::Ready(())) | Err(_) => { h2.settings().remove_channel(); self.node.as_ref().unwrap().remove(); } _ => (), } return result } None => unreachable!(), } // upgrade to h2 let proto = self.proto.take().unwrap(); match proto { HttpProtocol::H1(h1) => { let (h, io, addr, buf) = h1.into_inner(); self.proto = Some( HttpProtocol::H2(h2::Http2::new(h, io, addr, buf))); self.poll() } _ => unreachable!() } } } pub(crate) struct Node { next: Option<*mut Node<()>>, prev: Option<*mut Node<()>>, element: *mut T, } impl Node { fn new(el: &mut T) -> Self { Node { next: None, prev: None, element: el as *mut _, } } fn insert(&self, next: &Node) { #[allow(mutable_transmutes)] unsafe { if let Some(ref next2) = self.next { let n: &mut Node<()> = mem::transmute(next2.as_ref().unwrap()); n.prev = Some(next as *const _ as *mut _); } let slf: &mut Node = mem::transmute(self); slf.next = Some(next as *const _ as *mut _); let next: &mut Node = mem::transmute(next); next.prev = Some(slf as *const _ as *mut _); } } fn remove(&self) { #[allow(mutable_transmutes)] unsafe { if let Some(ref prev) = self.prev { let p: &mut Node<()> = mem::transmute(prev.as_ref().unwrap()); let slf: &mut Node = mem::transmute(self); p.next = slf.next.take(); } } } } impl Node<()> { pub(crate) fn head() -> Self { Node { next: None, prev: None, element: ptr::null_mut(), } } pub(crate) fn traverse(&self) where T: IoStream, H: HttpHandler + 'static { let mut next = self.next.as_ref(); loop { if let Some(n) = next { unsafe { let n: &Node<()> = mem::transmute(n.as_ref().unwrap()); next = n.next.as_ref(); if !n.element.is_null() { let ch: &mut HttpChannel = mem::transmute( &mut *(n.element as *mut _)); ch.shutdown(); } } } else { return } } } } impl IoStream for TcpStream { #[inline] fn shutdown(&mut self, how: Shutdown) -> io::Result<()> { TcpStream::shutdown(self, how) } #[inline] fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { TcpStream::set_nodelay(self, nodelay) } #[inline] fn set_linger(&mut self, dur: Option) -> io::Result<()> { TcpStream::set_linger(self, dur) } } /// Wrapper for `AsyncRead + AsyncWrite` types pub(crate) struct WrapperStream where T: AsyncRead + AsyncWrite + 'static { io: T, } impl WrapperStream where T: AsyncRead + AsyncWrite + 'static { pub fn new(io: T) -> Self { WrapperStream{io: io} } } impl IoStream for WrapperStream where T: AsyncRead + AsyncWrite + 'static { #[inline] fn shutdown(&mut self, _: Shutdown) -> io::Result<()> { Ok(()) } #[inline] fn set_nodelay(&mut self, _: bool) -> io::Result<()> { Ok(()) } #[inline] fn set_linger(&mut self, _: Option) -> io::Result<()> { Ok(()) } } impl io::Read for WrapperStream where T: AsyncRead + AsyncWrite + 'static { #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { self.io.read(buf) } } impl io::Write for WrapperStream where T: AsyncRead + AsyncWrite + 'static { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { self.io.write(buf) } #[inline] fn flush(&mut self) -> io::Result<()> { self.io.flush() } } impl AsyncRead for WrapperStream where T: AsyncRead + AsyncWrite + 'static { fn read_buf(&mut self, buf: &mut B) -> Poll { self.io.read_buf(buf) } } impl AsyncWrite for WrapperStream where T: AsyncRead + AsyncWrite + 'static { fn shutdown(&mut self) -> Poll<(), io::Error> { self.io.shutdown() } fn write_buf(&mut self, buf: &mut B) -> Poll { self.io.write_buf(buf) } } #[cfg(feature="alpn")] use tokio_openssl::SslStream; #[cfg(feature="alpn")] impl IoStream for SslStream { #[inline] fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { let _ = self.get_mut().shutdown(); Ok(()) } #[inline] fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { self.get_mut().get_mut().set_nodelay(nodelay) } #[inline] fn set_linger(&mut self, dur: Option) -> io::Result<()> { self.get_mut().get_mut().set_linger(dur) } } #[cfg(feature="tls")] use tokio_tls::TlsStream; #[cfg(feature="tls")] impl IoStream for TlsStream { #[inline] fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { let _ = self.get_mut().shutdown(); Ok(()) } #[inline] fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { self.get_mut().get_mut().set_nodelay(nodelay) } #[inline] fn set_linger(&mut self, dur: Option) -> io::Result<()> { self.get_mut().get_mut().set_linger(dur) } }