1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-25 00:12:59 +01:00
actix-extras/src/channel.rs

135 lines
3.6 KiB
Rust
Raw Normal View History

2017-12-08 18:24:05 +01:00
use std::rc::Rc;
use std::net::SocketAddr;
use bytes::Bytes;
use futures::{Future, Poll, Async};
use tokio_io::{AsyncRead, AsyncWrite};
use {h1, h2};
2017-12-09 13:33:40 +01:00
use error::Error;
use h1writer::Writer;
use httprequest::HttpRequest;
2017-12-28 21:38:37 +01:00
use server::ServerSettings;
use worker::WorkerSettings;
/// Low level http request handler
2017-12-08 18:24:05 +01:00
#[allow(unused_variables)]
pub trait HttpHandler: 'static {
2017-12-09 13:33:40 +01:00
/// Handle request
2017-12-26 18:00:45 +01:00
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>;
}
2017-12-09 13:33:40 +01:00
pub trait HttpHandlerTask {
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error>;
fn poll(&mut self) -> Poll<(), Error>;
fn disconnected(&mut self);
}
2017-12-06 20:00:39 +01:00
/// Conversion helper trait
pub trait IntoHttpHandler {
/// The associated type which is result of conversion.
type Handler: HttpHandler;
/// Convert into `HttpHandler` object.
2017-12-29 20:33:04 +01:00
fn into_handler(self, settings: ServerSettings) -> Self::Handler;
2017-12-06 20:00:39 +01:00
}
impl<T: HttpHandler> IntoHttpHandler for T {
type Handler = T;
2017-12-29 20:33:04 +01:00
fn into_handler(self, _: ServerSettings) -> Self::Handler {
2017-12-06 20:00:39 +01:00
self
}
}
enum HttpProtocol<T, H>
2017-12-09 13:33:40 +01:00
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
H1(h1::Http1<T, H>),
H2(h2::Http2<T, H>),
}
2017-11-27 07:53:28 +01:00
#[doc(hidden)]
pub struct HttpChannel<T, H>
2017-12-09 13:33:40 +01:00
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
proto: Option<HttpProtocol<T, H>>,
}
impl<T, H> HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
2017-12-14 06:38:47 +01:00
pub(crate) fn new(h: Rc<WorkerSettings<H>>,
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
{
2017-12-29 01:25:47 +01:00
h.add_channel();
if http2 {
HttpChannel {
proto: Some(HttpProtocol::H2(
2017-12-08 18:24:05 +01:00
h2::Http2::new(h, io, peer, Bytes::new()))) }
} else {
HttpChannel {
proto: Some(HttpProtocol::H1(
2017-12-08 18:24:05 +01:00
h1::Http1::new(h, io, peer))) }
}
}
}
/*impl<T: 'static, A: 'static, H: 'static> Drop for HttpChannel<T, A, H> {
fn drop(&mut self) {
println!("Drop http channel");
}
}*/
impl<T, H> Future for HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
match h1.poll() {
2017-12-29 01:25:47 +01:00
Ok(Async::Ready(h1::Http1Result::Done)) => {
h1.settings().remove_channel();
return Ok(Async::Ready(()))
}
2017-11-04 21:49:05 +01:00
Ok(Async::Ready(h1::Http1Result::Switch)) => (),
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
2017-12-29 01:25:47 +01:00
Err(_) => {
h1.settings().remove_channel();
return Err(())
}
}
}
Some(HttpProtocol::H2(ref mut h2)) => {
let result = h2.poll();
match result {
Ok(Async::Ready(())) | Err(_) => h2.settings().remove_channel(),
_ => (),
}
2017-12-29 01:25:47 +01:00
return result
}
None => unreachable!(),
}
// upgrade to h2
let proto = self.proto.take().unwrap();
match proto {
HttpProtocol::H1(h1) => {
2017-12-08 18:24:05 +01:00
let (h, io, addr, buf) = h1.into_inner();
2017-12-08 07:54:44 +01:00
self.proto = Some(
2017-12-08 18:24:05 +01:00
HttpProtocol::H2(h2::Http2::new(h, io, addr, buf)));
self.poll()
}
_ => unreachable!()
}
}
}