1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

pass local addr to channel; use bitflags

This commit is contained in:
Nikolay Kim 2017-12-07 21:52:46 -08:00
parent 3f06439d3e
commit b71ddf7b4c
9 changed files with 190 additions and 126 deletions

View File

@ -42,7 +42,6 @@ httparse = "0.1"
http-range = "0.1" http-range = "0.1"
mime = "0.3" mime = "0.3"
mime_guess = "1.8" mime_guess = "1.8"
cookie = { version="0.10", features=["percent-encode", "secure"] }
regex = "0.2" regex = "0.2"
sha1 = "0.2" sha1 = "0.2"
url = "1.5" url = "1.5"
@ -53,8 +52,8 @@ flate2 = "0.2"
brotli2 = "^0.3.2" brotli2 = "^0.3.2"
percent-encoding = "1.0" percent-encoding = "1.0"
smallvec = "0.6" smallvec = "0.6"
bitflags = "1.0"
# redis-async = { git="https://github.com/benashford/redis-async-rs" } cookie = { version="0.10", features=["percent-encode", "secure"] }
# tokio # tokio
bytes = "0.4" bytes = "0.4"

View File

@ -51,16 +51,21 @@ pub struct HttpChannel<T, H>
impl<T, H> HttpChannel<T, H> impl<T, H> HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{ {
pub fn new(stream: T, addr: Option<SocketAddr>, router: Rc<Vec<H>>, http2: bool) pub fn new(stream: T,
-> HttpChannel<T, H> { local: SocketAddr,
secure: bool,
peer: Option<SocketAddr>,
router: Rc<Vec<H>>,
http2: bool) -> HttpChannel<T, H>
{
if http2 { if http2 {
HttpChannel { HttpChannel {
proto: Some(HttpProtocol::H2( proto: Some(HttpProtocol::H2(
h2::Http2::new(stream, addr, router, Bytes::new()))) } h2::Http2::new(stream, local, secure, peer, router, Bytes::new()))) }
} else { } else {
HttpChannel { HttpChannel {
proto: Some(HttpProtocol::H1( proto: Some(HttpProtocol::H1(
h1::Http1::new(stream, addr, router))) } h1::Http1::new(stream, local, secure, peer, router))) }
} }
} }
} }
@ -105,8 +110,9 @@ impl<T, H> Future for HttpChannel<T, H>
let proto = self.proto.take().unwrap(); let proto = self.proto.take().unwrap();
match proto { match proto {
HttpProtocol::H1(h1) => { HttpProtocol::H1(h1) => {
let (stream, addr, router, buf) = h1.into_inner(); let (stream, local, secure, addr, router, buf) = h1.into_inner();
self.proto = Some(HttpProtocol::H2(h2::Http2::new(stream, addr, router, buf))); self.proto = Some(HttpProtocol::H2(
h2::Http2::new(stream, local, secure, addr, router, buf)));
self.poll() self.poll()
} }
_ => unreachable!() _ => unreachable!()

View File

@ -29,6 +29,24 @@ const MAX_HEADERS: usize = 100;
const MAX_PIPELINED_MESSAGES: usize = 16; const MAX_PIPELINED_MESSAGES: usize = 16;
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
bitflags! {
struct Flags: u8 {
const SECURE = 0b0000_0001;
const ERROR = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const H2 = 0b0000_1000;
}
}
bitflags! {
struct EntryFlags: u8 {
const EOF = 0b0000_0001;
const ERROR = 0b0000_0010;
const FINISHED = 0b0000_0100;
}
}
pub(crate) enum Http1Result { pub(crate) enum Http1Result {
Done, Done,
Switch, Switch,
@ -41,44 +59,44 @@ enum Item {
} }
pub(crate) struct Http1<T: AsyncWrite + 'static, H: 'static> { pub(crate) struct Http1<T: AsyncWrite + 'static, H: 'static> {
flags: Flags,
router: Rc<Vec<H>>, router: Rc<Vec<H>>,
local: SocketAddr,
addr: Option<SocketAddr>, addr: Option<SocketAddr>,
stream: H1Writer<T>, stream: H1Writer<T>,
reader: Reader, reader: Reader,
read_buf: BytesMut, read_buf: BytesMut,
error: bool,
tasks: VecDeque<Entry>, tasks: VecDeque<Entry>,
keepalive: bool,
keepalive_timer: Option<Timeout>, keepalive_timer: Option<Timeout>,
h2: bool,
} }
struct Entry { struct Entry {
pipe: Pipeline, pipe: Pipeline,
eof: bool, flags: EntryFlags,
error: bool,
finished: bool,
} }
impl<T, H> Http1<T, H> impl<T, H> Http1<T, H>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler + 'static H: HttpHandler + 'static
{ {
pub fn new(stream: T, addr: Option<SocketAddr>, router: Rc<Vec<H>>) -> Self { pub fn new(stream: T, local: SocketAddr, secure: bool,
addr: Option<SocketAddr>, router: Rc<Vec<H>>) -> Self {
Http1{ router: router, Http1{ router: router,
local: local,
flags: if secure { Flags::SECURE | Flags::KEEPALIVE } else { Flags::KEEPALIVE },
addr: addr, addr: addr,
stream: H1Writer::new(stream), stream: H1Writer::new(stream),
reader: Reader::new(), reader: Reader::new(),
read_buf: BytesMut::new(), read_buf: BytesMut::new(),
error: false,
tasks: VecDeque::new(), tasks: VecDeque::new(),
keepalive: true, keepalive_timer: None }
keepalive_timer: None,
h2: false }
} }
pub fn into_inner(mut self) -> (T, Option<SocketAddr>, Rc<Vec<H>>, Bytes) { pub fn into_inner(mut self) -> (T, SocketAddr, bool,
(self.stream.unwrap(), self.addr, self.router, self.read_buf.freeze()) Option<SocketAddr>, Rc<Vec<H>>, Bytes) {
(self.stream.unwrap(), self.local,
self.flags.contains(Flags::SECURE),
self.addr, self.router, self.read_buf.freeze())
} }
pub fn poll(&mut self) -> Poll<Http1Result, ()> { pub fn poll(&mut self) -> Poll<Http1Result, ()> {
@ -103,8 +121,8 @@ impl<T, H> Http1<T, H>
while idx < self.tasks.len() { while idx < self.tasks.len() {
let item = &mut self.tasks[idx]; let item = &mut self.tasks[idx];
if !io && !item.eof { if !io && !item.flags.contains(EntryFlags::EOF) {
if item.error { if item.flags.contains(EntryFlags::ERROR) {
return Err(()) return Err(())
} }
@ -113,14 +131,16 @@ impl<T, H> Http1<T, H>
not_ready = false; not_ready = false;
// overide keep-alive state // overide keep-alive state
if self.keepalive { if self.stream.keepalive() {
self.keepalive = self.stream.keepalive(); self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
} }
self.stream = H1Writer::new(self.stream.unwrap()); self.stream = H1Writer::new(self.stream.unwrap());
item.eof = true; item.flags.insert(EntryFlags::EOF);
if ready { if ready {
item.finished = true; item.flags.insert(EntryFlags::FINISHED);
} }
}, },
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
@ -134,15 +154,15 @@ impl<T, H> Http1<T, H>
return Err(()) return Err(())
} }
} }
} else if !item.finished { } else if !item.flags.contains(EntryFlags::FINISHED) {
match item.pipe.poll() { match item.pipe.poll() {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
not_ready = false; not_ready = false;
item.finished = true; item.flags.insert(EntryFlags::FINISHED);
}, },
Err(err) => { Err(err) => {
item.error = true; item.flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
} }
} }
@ -152,7 +172,9 @@ impl<T, H> Http1<T, H>
// cleanup finished tasks // cleanup finished tasks
while !self.tasks.is_empty() { while !self.tasks.is_empty() {
if self.tasks[0].eof && self.tasks[0].finished { if self.tasks[0].flags.contains(EntryFlags::EOF) &&
self.tasks[0].flags.contains(EntryFlags::FINISHED)
{
self.tasks.pop_front(); self.tasks.pop_front();
} else { } else {
break break
@ -160,8 +182,8 @@ impl<T, H> Http1<T, H>
} }
// no keep-alive // no keep-alive
if !self.keepalive && self.tasks.is_empty() { if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() {
if self.h2 { if self.flags.contains(Flags::H2) {
return Ok(Async::Ready(Http1Result::Switch)) return Ok(Async::Ready(Http1Result::Switch))
} else { } else {
return Ok(Async::Ready(Http1Result::Done)) return Ok(Async::Ready(Http1Result::Done))
@ -169,7 +191,8 @@ impl<T, H> Http1<T, H>
} }
// read incoming data // read incoming data
while !self.error && !self.h2 && self.tasks.len() < MAX_PIPELINED_MESSAGES { while !self.flags.contains(Flags::ERROR) && !self.flags.contains(Flags::H2) &&
self.tasks.len() < MAX_PIPELINED_MESSAGES {
match self.reader.parse(self.stream.get_mut(), &mut self.read_buf) { match self.reader.parse(self.stream.get_mut(), &mut self.read_buf) {
Ok(Async::Ready(Item::Http1(mut req))) => { Ok(Async::Ready(Item::Http1(mut req))) => {
not_ready = false; not_ready = false;
@ -194,16 +217,14 @@ impl<T, H> Http1<T, H>
self.tasks.push_back( self.tasks.push_back(
Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)), Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
eof: false, flags: EntryFlags::empty()});
error: false,
finished: false});
} }
Ok(Async::Ready(Item::Http2)) => { Ok(Async::Ready(Item::Http2)) => {
self.h2 = true; self.flags.insert(Flags::H2);
} }
Err(ReaderError::Disconnect) => { Err(ReaderError::Disconnect) => {
not_ready = false; not_ready = false;
self.error = true; self.flags.insert(Flags::ERROR);
self.stream.disconnected(); self.stream.disconnected();
for entry in &mut self.tasks { for entry in &mut self.tasks {
entry.pipe.disconnected() entry.pipe.disconnected()
@ -218,26 +239,24 @@ impl<T, H> Http1<T, H>
} }
// kill keepalive // kill keepalive
self.keepalive = false; self.flags.remove(Flags::KEEPALIVE);
self.keepalive_timer.take(); self.keepalive_timer.take();
// on parse error, stop reading stream but tasks need to be completed // on parse error, stop reading stream but tasks need to be completed
self.error = true; self.flags.insert(Flags::ERROR);
if self.tasks.is_empty() { if self.tasks.is_empty() {
if let ReaderError::Error(err) = err { if let ReaderError::Error(err) = err {
self.tasks.push_back( self.tasks.push_back(
Entry {pipe: Pipeline::error(err.error_response()), Entry {pipe: Pipeline::error(err.error_response()),
eof: false, flags: EntryFlags::empty()});
error: false,
finished: false});
} }
} }
} }
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
// start keep-alive timer, this is also slow request timeout // start keep-alive timer, this is also slow request timeout
if self.tasks.is_empty() { if self.tasks.is_empty() {
if self.keepalive { if self.flags.contains(Flags::KEEPALIVE) {
if self.keepalive_timer.is_none() { if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer"); trace!("Start keep-alive timer");
let mut timeout = Timeout::new( let mut timeout = Timeout::new(
@ -259,10 +278,10 @@ impl<T, H> Http1<T, H>
// check for parse error // check for parse error
if self.tasks.is_empty() { if self.tasks.is_empty() {
if self.h2 { if self.flags.contains(Flags::H2) {
return Ok(Async::Ready(Http1Result::Switch)) return Ok(Async::Ready(Http1Result::Switch))
} }
if self.error || self.keepalive_timer.is_none() { if self.flags.contains(Flags::ERROR) || self.keepalive_timer.is_none() {
return Ok(Async::Ready(Http1Result::Done)) return Ok(Async::Ready(Http1Result::Done))
} }
} }

View File

@ -35,28 +35,30 @@ pub(crate) trait Writer {
fn poll_complete(&mut self) -> Poll<(), io::Error>; fn poll_complete(&mut self) -> Poll<(), io::Error>;
} }
bitflags! {
struct Flags: u8 {
const STARTED = 0b0000_0001;
const UPGRADE = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const DISCONNECTED = 0b0000_1000;
}
}
pub(crate) struct H1Writer<T: AsyncWrite> { pub(crate) struct H1Writer<T: AsyncWrite> {
flags: Flags,
stream: Option<T>, stream: Option<T>,
started: bool,
encoder: PayloadEncoder, encoder: PayloadEncoder,
upgrade: bool,
keepalive: bool,
disconnected: bool,
written: u64, written: u64,
headers_size: u64, headers_size: u32,
} }
impl<T: AsyncWrite> H1Writer<T> { impl<T: AsyncWrite> H1Writer<T> {
pub fn new(stream: T) -> H1Writer<T> { pub fn new(stream: T) -> H1Writer<T> {
H1Writer { H1Writer {
flags: Flags::empty(),
stream: Some(stream), stream: Some(stream),
started: false,
encoder: PayloadEncoder::default(), encoder: PayloadEncoder::default(),
upgrade: false,
keepalive: false,
disconnected: false,
written: 0, written: 0,
headers_size: 0, headers_size: 0,
} }
@ -75,7 +77,7 @@ impl<T: AsyncWrite> H1Writer<T> {
} }
pub fn keepalive(&self) -> bool { pub fn keepalive(&self) -> bool {
self.keepalive && !self.upgrade self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
} }
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> { fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
@ -105,9 +107,10 @@ impl<T: AsyncWrite> H1Writer<T> {
impl<T: AsyncWrite> Writer for H1Writer<T> { impl<T: AsyncWrite> Writer for H1Writer<T> {
#[cfg_attr(feature = "cargo-clippy", allow(cast_lossless))]
fn written(&self) -> u64 { fn written(&self) -> u64 {
if self.written > self.headers_size { if self.written > self.headers_size as u64 {
self.written - self.headers_size self.written - self.headers_size as u64
} else { } else {
0 0
} }
@ -119,9 +122,11 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
trace!("Prepare response with status: {:?}", msg.status()); trace!("Prepare response with status: {:?}", msg.status());
// prepare task // prepare task
self.started = true; self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(req, msg); self.encoder = PayloadEncoder::new(req, msg);
self.keepalive = msg.keep_alive().unwrap_or_else(|| req.keep_alive()); if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) {
self.flags.insert(Flags::KEEPALIVE);
}
// Connection upgrade // Connection upgrade
let version = msg.version().unwrap_or_else(|| req.version()); let version = msg.version().unwrap_or_else(|| req.version());
@ -129,7 +134,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("upgrade")); msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("upgrade"));
} }
// keep-alive // keep-alive
else if self.keepalive { else if self.flags.contains(Flags::KEEPALIVE) {
if version < Version::HTTP_11 { if version < Version::HTTP_11 {
msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("keep-alive")); msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("keep-alive"));
} }
@ -177,7 +182,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// msg eof // msg eof
buffer.extend(b"\r\n"); buffer.extend(b"\r\n");
self.headers_size = buffer.len() as u64; self.headers_size = buffer.len() as u32;
} }
trace!("Response: {:?}", msg); trace!("Response: {:?}", msg);
@ -193,8 +198,8 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
} }
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> { fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
if !self.disconnected { if !self.flags.contains(Flags::DISCONNECTED) {
if self.started { if self.flags.contains(Flags::STARTED) {
// TODO: add warning, write after EOF // TODO: add warning, write after EOF
self.encoder.write(payload)?; self.encoder.write(payload)?;
} else { } else {

View File

@ -25,13 +25,22 @@ use payload::{Payload, PayloadWriter};
const KEEPALIVE_PERIOD: u64 = 15; // seconds const KEEPALIVE_PERIOD: u64 = 15; // seconds
bitflags! {
struct Flags: u8 {
const SECURE = 0b0000_0001;
const DISCONNECTED = 0b0000_0010;
}
}
/// HTTP/2 Transport
pub(crate) struct Http2<T, H> pub(crate) struct Http2<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: 'static where T: AsyncRead + AsyncWrite + 'static, H: 'static
{ {
flags: Flags,
router: Rc<Vec<H>>, router: Rc<Vec<H>>,
local: SocketAddr,
addr: Option<SocketAddr>, addr: Option<SocketAddr>,
state: State<IoWrapper<T>>, state: State<IoWrapper<T>>,
disconnected: bool,
tasks: VecDeque<Entry>, tasks: VecDeque<Entry>,
keepalive_timer: Option<Timeout>, keepalive_timer: Option<Timeout>,
} }
@ -46,10 +55,12 @@ impl<T, H> Http2<T, H>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler + 'static H: HttpHandler + 'static
{ {
pub fn new(stream: T, addr: Option<SocketAddr>, router: Rc<Vec<H>>, buf: Bytes) -> Self { pub fn new(stream: T, local: SocketAddr, secure: bool,
Http2{ router: router, addr: Option<SocketAddr>, router: Rc<Vec<H>>, buf: Bytes) -> Self {
Http2{ flags: if secure { Flags::SECURE } else { Flags::empty() },
router: router,
local: local,
addr: addr, addr: addr,
disconnected: false,
tasks: VecDeque::new(), tasks: VecDeque::new(),
state: State::Handshake( state: State::Handshake(
Server::handshake(IoWrapper{unread: Some(buf), inner: stream})), Server::handshake(IoWrapper{unread: Some(buf), inner: stream})),
@ -80,33 +91,33 @@ impl<T, H> Http2<T, H>
// read payload // read payload
item.poll_payload(); item.poll_payload();
if !item.eof { if !item.flags.contains(EntryFlags::EOF) {
match item.task.poll_io(&mut item.stream) { match item.task.poll_io(&mut item.stream) {
Ok(Async::Ready(ready)) => { Ok(Async::Ready(ready)) => {
item.eof = true; item.flags.insert(EntryFlags::EOF);
if ready { if ready {
item.finished = true; item.flags.insert(EntryFlags::FINISHED);
} }
not_ready = false; not_ready = false;
}, },
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Err(err) => { Err(err) => {
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
item.eof = true; item.flags.insert(EntryFlags::EOF);
item.error = true; item.flags.insert(EntryFlags::ERROR);
item.stream.reset(Reason::INTERNAL_ERROR); item.stream.reset(Reason::INTERNAL_ERROR);
} }
} }
} else if !item.finished { } else if !item.flags.contains(EntryFlags::FINISHED) {
match item.task.poll() { match item.task.poll() {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
not_ready = false; not_ready = false;
item.finished = true; item.flags.insert(EntryFlags::FINISHED);
}, },
Err(err) => { Err(err) => {
item.error = true; item.flags.insert(EntryFlags::ERROR);
item.finished = true; item.flags.insert(EntryFlags::FINISHED);
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
} }
} }
@ -115,7 +126,10 @@ impl<T, H> Http2<T, H>
// cleanup finished tasks // cleanup finished tasks
while !self.tasks.is_empty() { while !self.tasks.is_empty() {
if self.tasks[0].eof && self.tasks[0].finished || self.tasks[0].error { if self.tasks[0].flags.contains(EntryFlags::EOF) &&
self.tasks[0].flags.contains(EntryFlags::FINISHED) ||
self.tasks[0].flags.contains(EntryFlags::ERROR)
{
self.tasks.pop_front(); self.tasks.pop_front();
} else { } else {
break break
@ -123,11 +137,11 @@ impl<T, H> Http2<T, H>
} }
// get request // get request
if !self.disconnected { if !self.flags.contains(Flags::DISCONNECTED) {
match server.poll() { match server.poll() {
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
not_ready = false; not_ready = false;
self.disconnected = true; self.flags.insert(Flags::DISCONNECTED);
for entry in &mut self.tasks { for entry in &mut self.tasks {
entry.task.disconnected() entry.task.disconnected()
} }
@ -156,7 +170,7 @@ impl<T, H> Http2<T, H>
} }
Err(err) => { Err(err) => {
trace!("Connection error: {}", err); trace!("Connection error: {}", err);
self.disconnected = true; self.flags.insert(Flags::DISCONNECTED);
for entry in &mut self.tasks { for entry in &mut self.tasks {
entry.task.disconnected() entry.task.disconnected()
} }
@ -166,7 +180,7 @@ impl<T, H> Http2<T, H>
} }
if not_ready { if not_ready {
if self.tasks.is_empty() && self.disconnected { if self.tasks.is_empty() && self.flags.contains(Flags::DISCONNECTED) {
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
} else { } else {
return Ok(Async::NotReady) return Ok(Async::NotReady)
@ -196,16 +210,22 @@ impl<T, H> Http2<T, H>
} }
} }
bitflags! {
struct EntryFlags: u8 {
const EOF = 0b0000_0001;
const REOF = 0b0000_0010;
const ERROR = 0b0000_0100;
const FINISHED = 0b0000_1000;
}
}
struct Entry { struct Entry {
task: Pipeline, task: Pipeline,
payload: PayloadType, payload: PayloadType,
recv: RecvStream, recv: RecvStream,
stream: H2Writer, stream: H2Writer,
eof: bool,
error: bool,
finished: bool,
reof: bool,
capacity: usize, capacity: usize,
flags: EntryFlags,
} }
impl Entry { impl Entry {
@ -244,22 +264,19 @@ impl Entry {
payload: psender, payload: psender,
recv: recv, recv: recv,
stream: H2Writer::new(resp), stream: H2Writer::new(resp),
eof: false, flags: EntryFlags::empty(),
error: false,
finished: false,
reof: false,
capacity: 0, capacity: 0,
} }
} }
fn poll_payload(&mut self) { fn poll_payload(&mut self) {
if !self.reof { if !self.flags.contains(EntryFlags::REOF) {
match self.recv.poll() { match self.recv.poll() {
Ok(Async::Ready(Some(chunk))) => { Ok(Async::Ready(Some(chunk))) => {
self.payload.feed_data(chunk); self.payload.feed_data(chunk);
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
self.reof = true; self.flags.insert(EntryFlags::REOF);
}, },
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Err(err) => { Err(err) => {

View File

@ -16,14 +16,19 @@ use h1writer::{Writer, WriterState};
const CHUNK_SIZE: usize = 16_384; const CHUNK_SIZE: usize = 16_384;
const MAX_WRITE_BUFFER_SIZE: usize = 65_536; // max buffer size 64k const MAX_WRITE_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
bitflags! {
struct Flags: u8 {
const STARTED = 0b0000_0001;
const DISCONNECTED = 0b0000_0010;
const EOF = 0b0000_0100;
}
}
pub(crate) struct H2Writer { pub(crate) struct H2Writer {
respond: Respond<Bytes>, respond: Respond<Bytes>,
stream: Option<SendStream<Bytes>>, stream: Option<SendStream<Bytes>>,
started: bool,
encoder: PayloadEncoder, encoder: PayloadEncoder,
disconnected: bool, flags: Flags,
eof: bool,
written: u64, written: u64,
} }
@ -33,10 +38,8 @@ impl H2Writer {
H2Writer { H2Writer {
respond: respond, respond: respond,
stream: None, stream: None,
started: false,
encoder: PayloadEncoder::default(), encoder: PayloadEncoder::default(),
disconnected: false, flags: Flags::empty(),
eof: true,
written: 0, written: 0,
} }
} }
@ -48,7 +51,7 @@ impl H2Writer {
} }
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> { fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
if !self.started { if !self.flags.contains(Flags::STARTED) {
return Ok(WriterState::Done) return Ok(WriterState::Done)
} }
@ -56,7 +59,7 @@ impl H2Writer {
let buffer = self.encoder.get_mut(); let buffer = self.encoder.get_mut();
if buffer.is_empty() { if buffer.is_empty() {
if self.eof { if self.flags.contains(Flags::EOF) {
let _ = stream.send_data(Bytes::new(), true); let _ = stream.send_data(Bytes::new(), true);
} }
return Ok(WriterState::Done) return Ok(WriterState::Done)
@ -77,7 +80,7 @@ impl H2Writer {
Ok(Async::Ready(Some(cap))) => { Ok(Async::Ready(Some(cap))) => {
let len = buffer.len(); let len = buffer.len();
let bytes = buffer.split_to(cmp::min(cap, len)); let bytes = buffer.split_to(cmp::min(cap, len));
let eof = buffer.is_empty() && self.eof; let eof = buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64; self.written += bytes.len() as u64;
if let Err(err) = stream.send_data(bytes.freeze(), eof) { if let Err(err) = stream.send_data(bytes.freeze(), eof) {
@ -111,9 +114,11 @@ impl Writer for H2Writer {
trace!("Prepare response with status: {:?}", msg.status()); trace!("Prepare response with status: {:?}", msg.status());
// prepare response // prepare response
self.started = true; self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(req, msg); self.encoder = PayloadEncoder::new(req, msg);
self.eof = if let Body::Empty = *msg.body() { true } else { false }; if let Body::Empty = *msg.body() {
self.flags.insert(Flags::EOF);
}
// http2 specific // http2 specific
msg.headers_mut().remove(CONNECTION); msg.headers_mut().remove(CONNECTION);
@ -140,7 +145,7 @@ impl Writer for H2Writer {
resp.headers_mut().insert(key, value.clone()); resp.headers_mut().insert(key, value.clone());
} }
match self.respond.send_response(resp, self.eof) { match self.respond.send_response(resp, self.flags.contains(Flags::EOF)) {
Ok(stream) => Ok(stream) =>
self.stream = Some(stream), self.stream = Some(stream),
Err(_) => Err(_) =>
@ -151,7 +156,7 @@ impl Writer for H2Writer {
if msg.body().is_binary() { if msg.body().is_binary() {
if let Body::Binary(bytes) = msg.replace_body(Body::Empty) { if let Body::Binary(bytes) = msg.replace_body(Body::Empty) {
self.eof = true; self.flags.insert(Flags::EOF);
self.encoder.write(bytes.as_ref())?; self.encoder.write(bytes.as_ref())?;
if let Some(ref mut stream) = self.stream { if let Some(ref mut stream) = self.stream {
stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE)); stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE));
@ -164,8 +169,8 @@ impl Writer for H2Writer {
} }
fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> { fn write(&mut self, payload: &[u8]) -> Result<WriterState, io::Error> {
if !self.disconnected { if !self.flags.contains(Flags::DISCONNECTED) {
if self.started { if self.flags.contains(Flags::STARTED) {
// TODO: add warning, write after EOF // TODO: add warning, write after EOF
self.encoder.write(payload)?; self.encoder.write(payload)?;
} else { } else {
@ -184,7 +189,7 @@ impl Writer for H2Writer {
fn write_eof(&mut self) -> Result<WriterState, io::Error> { fn write_eof(&mut self) -> Result<WriterState, io::Error> {
self.encoder.write_eof()?; self.encoder.write_eof()?;
self.eof = true; self.flags.insert(Flags::EOF);
if !self.encoder.is_eof() { if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other, Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached")) "Last payload item, but eof is not reached"))

View File

@ -11,6 +11,8 @@ extern crate bytes;
extern crate sha1; extern crate sha1;
extern crate regex; extern crate regex;
#[macro_use] #[macro_use]
extern crate bitflags;
#[macro_use]
extern crate futures; extern crate futures;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_core; extern crate tokio_core;
@ -61,7 +63,6 @@ mod route;
mod router; mod router;
mod param; mod param;
mod resource; mod resource;
// mod recognizer;
mod handler; mod handler;
mod pipeline; mod pipeline;
mod server; mod server;

View File

@ -26,7 +26,6 @@ use tokio_openssl::{SslStream, SslAcceptorExt};
use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; use channel::{HttpChannel, HttpHandler, IntoHttpHandler};
/// An HTTP Server /// An HTTP Server
/// ///
/// `T` - async stream, anything that implements `AsyncRead` + `AsyncWrite`. /// `T` - async stream, anything that implements `AsyncRead` + `AsyncWrite`.
@ -64,12 +63,15 @@ impl<T, A, H> HttpServer<T, A, H>
H: HttpHandler, H: HttpHandler,
{ {
/// Start listening for incomming connections from stream. /// Start listening for incomming connections from stream.
pub fn serve_incoming<S, Addr>(self, stream: S) -> io::Result<Addr> pub fn serve_incoming<S, Addr>(self, stream: S, secure: bool) -> io::Result<Addr>
where Self: ActorAddress<Self, Addr>, where Self: ActorAddress<Self, Addr>,
S: Stream<Item=(T, A), Error=io::Error> + 'static S: Stream<Item=(T, A), Error=io::Error> + 'static
{ {
Ok(HttpServer::create(move |ctx| { Ok(HttpServer::create(move |ctx| {
ctx.add_stream(stream.map(|(t, _)| IoStream(t, None, false))); let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
ctx.add_stream(stream.map(
move |(t, _)| IoStream{io: t, srv: addr,
peer: None, http2: false, secure: secure}));
self self
})) }))
} }
@ -114,7 +116,9 @@ impl<H: HttpHandler> HttpServer<TcpStream, net::SocketAddr, H> {
Ok(HttpServer::create(move |ctx| { Ok(HttpServer::create(move |ctx| {
for (addr, tcp) in addrs { for (addr, tcp) in addrs {
info!("Starting http server on {}", addr); info!("Starting http server on {}", addr);
ctx.add_stream(tcp.incoming().map(|(t, a)| IoStream(t, Some(a), false))); ctx.add_stream(tcp.incoming().map(
move |(t, a)| IoStream{io: t, srv: addr,
peer: Some(a), http2: false, secure: false}));
} }
self self
})) }))
@ -144,15 +148,15 @@ impl<H: HttpHandler> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H> {
}; };
Ok(HttpServer::create(move |ctx| { Ok(HttpServer::create(move |ctx| {
for (addr, tcp) in addrs { for (srv, tcp) in addrs {
info!("Starting tls http server on {}", addr); info!("Starting tls http server on {}", srv);
let acc = acceptor.clone(); let acc = acceptor.clone();
ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| { ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| {
TlsAcceptorExt::accept_async(acc.as_ref(), stream) TlsAcceptorExt::accept_async(acc.as_ref(), stream)
.map(move |t| { .map(move |t|
IoStream(t, Some(addr), false) IoStream{io: t, srv: srv.clone(),
}) peer: Some(addr), http2: false, secure: true})
.map_err(|err| { .map_err(|err| {
trace!("Error during handling tls connection: {}", err); trace!("Error during handling tls connection: {}", err);
io::Error::new(io::ErrorKind::Other, err) io::Error::new(io::ErrorKind::Other, err)
@ -191,8 +195,8 @@ impl<H: HttpHandler> HttpServer<SslStream<TcpStream>, net::SocketAddr, H> {
}; };
Ok(HttpServer::create(move |ctx| { Ok(HttpServer::create(move |ctx| {
for (addr, tcp) in addrs { for (srv, tcp) in addrs {
info!("Starting tls http server on {}", addr); info!("Starting tls http server on {}", srv);
let acc = acceptor.clone(); let acc = acceptor.clone();
ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| { ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| {
@ -205,7 +209,8 @@ impl<H: HttpHandler> HttpServer<SslStream<TcpStream>, net::SocketAddr, H> {
} else { } else {
false false
}; };
IoStream(stream, Some(addr), http2) IoStream{io: stream, srv: srv.clone(),
peer: Some(addr), http2: http2, secure: true}
}) })
.map_err(|err| { .map_err(|err| {
trace!("Error during handling tls connection: {}", err); trace!("Error during handling tls connection: {}", err);
@ -218,7 +223,13 @@ impl<H: HttpHandler> HttpServer<SslStream<TcpStream>, net::SocketAddr, H> {
} }
} }
struct IoStream<T>(T, Option<SocketAddr>, bool); struct IoStream<T> {
io: T,
srv: SocketAddr,
peer: Option<SocketAddr>,
http2: bool,
secure: bool,
}
impl<T> ResponseType for IoStream<T> impl<T> ResponseType for IoStream<T>
where T: AsyncRead + AsyncWrite + 'static where T: AsyncRead + AsyncWrite + 'static
@ -245,7 +256,8 @@ impl<T, A, H> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H>
-> Response<Self, IoStream<T>> -> Response<Self, IoStream<T>>
{ {
Arbiter::handle().spawn( Arbiter::handle().spawn(
HttpChannel::new(msg.0, msg.1, Rc::clone(&self.h), msg.2)); HttpChannel::new(msg.io, msg.srv, msg.secure,
msg.peer, Rc::clone(&self.h), msg.http2));
Self::empty() Self::empty()
} }
} }

View File

@ -39,7 +39,7 @@ fn test_serve_incoming() {
Application::new("/") Application::new("/")
.resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))); .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk)));
let tcp = TcpListener::from_listener(tcp, &addr2, Arbiter::handle()).unwrap(); let tcp = TcpListener::from_listener(tcp, &addr2, Arbiter::handle()).unwrap();
srv.serve_incoming::<_, ()>(tcp.incoming()).unwrap(); srv.serve_incoming::<_, ()>(tcp.incoming(), false).unwrap();
sys.run(); sys.run();
}); });