From 0b5f0c4f220c95546185c4af987fd097c46cb9ef Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 6 Oct 2017 21:48:14 -0700 Subject: [PATCH] initial implementation --- Cargo.toml | 15 +- README.md | 2 +- src/application.rs | 97 ++++++++++++ src/context.rs | 230 +++++++++++++++++++++++++++ src/date.rs | 60 +++++++ src/decode.rs | 268 +++++++++++++++++++++++++++++++ src/error.rs | 173 ++++++++++++++++++++ src/httpcodes.rs | 31 ++++ src/httpmessage.rs | 306 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 38 +++++ src/main.rs | 80 ++++++++++ src/reader.rs | 343 ++++++++++++++++++++++++++++++++++++++++ src/resource.rs | 122 ++++++++++++++ src/route.rs | 70 +++++++++ src/router.rs | 97 ++++++++++++ src/server.rs | 137 ++++++++++++++++ src/task.rs | 384 +++++++++++++++++++++++++++++++++++++++++++++ 17 files changed, 2451 insertions(+), 2 deletions(-) create mode 100644 src/application.rs create mode 100644 src/context.rs create mode 100644 src/date.rs create mode 100644 src/decode.rs create mode 100644 src/error.rs create mode 100644 src/httpcodes.rs create mode 100644 src/httpmessage.rs create mode 100644 src/main.rs create mode 100644 src/reader.rs create mode 100644 src/resource.rs create mode 100644 src/route.rs create mode 100644 src/router.rs create mode 100644 src/server.rs create mode 100644 src/task.rs diff --git a/Cargo.toml b/Cargo.toml index ff4f9d1c9..a315dce5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,16 +17,29 @@ build = "build.rs" name = "actix_http" path = "src/lib.rs" +[[bin]] +name = "test" +path = "src/main.rs" + [dependencies] +time = "0.1" +http = "0.1" +httparse = "*" +hyper = "0.11" +route-recognizer = "0.1" + # tokio bytes = "0.4" -mio = "0.6" futures = "0.1" tokio-core = "0.1" tokio-io = "0.1" +tokio-proto = "0.1" # other log = "0.3" +env_logger = "*" + +#actix = { git="https://github.com/fafhrd91/actix.git" } [dependencies.actix] #path = "../actix" diff --git a/README.md b/README.md index 8573af66f..f0e832f11 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Actix http is a http framework for Actix framework. * [API Documentation](http://fafhrd91.github.io/actix-http/actix_http/) -* Cargo package: [actix](https://crates.io/crates/actix-http) +* Cargo package: [actix-http](https://crates.io/crates/actix-http) --- diff --git a/src/application.rs b/src/application.rs new file mode 100644 index 000000000..83fb1e90b --- /dev/null +++ b/src/application.rs @@ -0,0 +1,97 @@ +use std::rc::Rc; +use std::string::ToString; +use std::collections::HashMap; + +use route_recognizer::Router; + +use task::Task; +use route::{Payload, RouteHandler}; +use router::HttpHandler; +use resource::HttpResource; +use httpmessage::HttpRequest; + + +/// Application +pub struct HttpApplication { + state: S, + default: HttpResource, + resources: HashMap>, +} + +impl HttpApplication where S: 'static +{ + pub(crate) fn prepare(self, prefix: String) -> Box { + let mut router = Router::new(); + let prefix = if prefix.ends_with('/') {prefix } else { prefix + "/" }; + + for (path, handler) in self.resources { + let path = prefix.clone() + path.trim_left_matches('/'); + router.add(path.as_str(), handler); + } + + Box::new( + InnerApplication { + state: Rc::new(self.state), + default: self.default, + router: router } + ) + } +} + +impl HttpApplication<()> { + pub fn no_state() -> Self { + HttpApplication { + state: (), + default: HttpResource::default(), + resources: HashMap::new(), + } + } +} + +impl HttpApplication where S: 'static { + + pub fn new(state: S) -> HttpApplication { + HttpApplication { + state: state, + default: HttpResource::default(), + resources: HashMap::new(), + } + } + + pub fn add(&mut self, path: P) -> &mut HttpResource + { + let path = path.to_string(); + + // add resource + if !self.resources.contains_key(&path) { + self.resources.insert(path.clone(), HttpResource::default()); + } + + self.resources.get_mut(&path).unwrap() + } + + /// Default resource + pub fn default(&mut self) -> &mut HttpResource { + &mut self.default + } +} + + +pub(crate) +struct InnerApplication { + state: Rc, + default: HttpResource, + router: Router>, +} + + +impl HttpHandler for InnerApplication { + + fn handle(&self, req: HttpRequest, payload: Option) -> Task { + if let Ok(h) = self.router.recognize(req.path()) { + h.handler.handle(req.with_params(h.params), payload, Rc::clone(&self.state)) + } else { + self.default.handle(req, payload, Rc::clone(&self.state)) + } + } +} diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 000000000..fd225b453 --- /dev/null +++ b/src/context.rs @@ -0,0 +1,230 @@ +use std; +use std::rc::Rc; +use std::collections::VecDeque; +use futures::{Async, Stream, Poll}; + +use bytes::Bytes; +use actix::{Actor, ActorState, ActorContext, AsyncActorContext}; +use actix::fut::ActorFuture; +use actix::dev::{AsyncContextApi, ActorAddressCell}; + +use route::{Route, Frame}; +use httpmessage::HttpMessage; + + +/// Actor execution context +pub struct HttpContext where A: Actor> + Route, +{ + act: A, + state: ActorState, + items: Vec>>, + address: ActorAddressCell, + stream: VecDeque, + app_state: Rc<::State>, +} + + +impl ActorContext for HttpContext where A: Actor + Route +{ + /// Stop actor execution + fn stop(&mut self) { + self.address.close(); + if self.state == ActorState::Running { + self.state = ActorState::Stopping; + } + } + + /// Terminate actor execution + fn terminate(&mut self) { + self.address.close(); + self.items.clear(); + self.state = ActorState::Stopped; + } + + /// Actor execution state + fn state(&self) -> ActorState { + self.state + } +} + +impl AsyncActorContext for HttpContext where A: Actor + Route +{ + fn spawn(&mut self, fut: F) + where F: ActorFuture + 'static + { + if self.state == ActorState::Stopped { + error!("Context::spawn called for stopped actor."); + } else { + self.items.push(Box::new(fut)) + } + } +} + +impl AsyncContextApi for HttpContext where A: Actor + Route { + fn address_cell(&mut self) -> &mut ActorAddressCell { + &mut self.address + } +} + +impl HttpContext where A: Actor + Route { + + pub(crate) fn new(act: A, state: Rc<::State>) -> HttpContext + { + HttpContext { + act: act, + state: ActorState::Started, + items: Vec::new(), + address: ActorAddressCell::new(), + stream: VecDeque::new(), + app_state: state, + } + } + + pub(crate) fn replace_actor(&mut self, srv: A) -> A { + std::mem::replace(&mut self.act, srv) + } +} + +impl HttpContext where A: Actor + Route { + + /// Shared application state + pub fn state(&self) -> &::State { + &self.app_state + } + + /// Start response processing + pub fn start(&mut self, response: HttpMessage) { + self.stream.push_back(Frame::Message(response)) + } + + /// Write payload + pub fn write(&mut self, data: Bytes) { + self.stream.push_back(Frame::Payload(Some(data))) + } + + /// Completed + pub fn write_eof(&mut self) { + self.stream.push_back(Frame::Payload(None)) + } +} + +impl Stream for HttpContext where A: Actor + Route +{ + type Item = Frame; + type Error = std::io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let ctx: &mut HttpContext = unsafe { + std::mem::transmute(self as &mut HttpContext) + }; + + // update state + match self.state { + ActorState::Started => { + Actor::started(&mut self.act, ctx); + self.state = ActorState::Running; + }, + ActorState::Stopping => { + Actor::stopping(&mut self.act, ctx); + } + _ => () + } + + let mut prep_stop = false; + loop { + let mut not_ready = true; + + if let Ok(Async::Ready(_)) = self.address.poll(&mut self.act, ctx) { + not_ready = false + } + + // check secondary streams + let mut idx = 0; + let mut len = self.items.len(); + loop { + if idx >= len { + break + } + + let (drop, item) = match self.items[idx].poll(&mut self.act, ctx) { + Ok(val) => match val { + Async::Ready(_) => { + not_ready = false; + (true, None) + } + Async::NotReady => (false, None), + }, + Err(_) => (true, None) + }; + + // we have new pollable item + if let Some(item) = item { + self.items.push(item); + } + + // number of items could be different, context can add more items + len = self.items.len(); + + // item finishes, we need to remove it, + // replace current item with last item + if drop { + len -= 1; + if idx >= len { + self.items.pop(); + break + } else { + self.items[idx] = self.items.pop().unwrap(); + } + } else { + idx += 1; + } + } + + // are we done + if !not_ready { + continue + } + + // get frame + if let Some(frame) = self.stream.pop_front() { + return Ok(Async::Ready(Some(frame))) + } + + // check state + match self.state { + ActorState::Stopped => { + self.state = ActorState::Stopped; + Actor::stopped(&mut self.act, ctx); + return Ok(Async::Ready(None)) + }, + ActorState::Stopping => { + if prep_stop { + if self.address.connected() || !self.items.is_empty() { + self.state = ActorState::Running; + continue + } else { + self.state = ActorState::Stopped; + Actor::stopped(&mut self.act, ctx); + return Ok(Async::Ready(None)) + } + } else { + Actor::stopping(&mut self.act, ctx); + prep_stop = true; + continue + } + }, + ActorState::Running => { + if !self.address.connected() && self.items.is_empty() { + self.state = ActorState::Stopping; + Actor::stopping(&mut self.act, ctx); + prep_stop = true; + continue + } + }, + _ => (), + } + + return Ok(Async::NotReady) + } + } +} diff --git a/src/date.rs b/src/date.rs new file mode 100644 index 000000000..294efa212 --- /dev/null +++ b/src/date.rs @@ -0,0 +1,60 @@ +use std::cell::RefCell; +use std::fmt::{self, Write}; +use std::str; + +use time::{self, Duration}; +use bytes::BytesMut; + +// "Sun, 06 Nov 1994 08:49:37 GMT".len() +pub const DATE_VALUE_LENGTH: usize = 29; + +pub fn extend(dst: &mut BytesMut) { + CACHED.with(|cache| { + let mut cache = cache.borrow_mut(); + let now = time::get_time(); + if now > cache.next_update { + cache.update(now); + } + dst.extend_from_slice(cache.buffer()); + }) +} + +struct CachedDate { + bytes: [u8; DATE_VALUE_LENGTH], + pos: usize, + next_update: time::Timespec, +} + +thread_local!(static CACHED: RefCell = RefCell::new(CachedDate { + bytes: [0; DATE_VALUE_LENGTH], + pos: 0, + next_update: time::Timespec::new(0, 0), +})); + +impl CachedDate { + fn buffer(&self) -> &[u8] { + &self.bytes[..] + } + + fn update(&mut self, now: time::Timespec) { + self.pos = 0; + write!(self, "{}", time::at_utc(now).rfc822()).unwrap(); + assert_eq!(self.pos, DATE_VALUE_LENGTH); + self.next_update = now + Duration::seconds(1); + self.next_update.nsec = 0; + } +} + +impl fmt::Write for CachedDate { + fn write_str(&mut self, s: &str) -> fmt::Result { + let len = s.len(); + self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes()); + self.pos += len; + Ok(()) + } +} + +#[test] +fn test_date_len() { + assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len()); +} diff --git a/src/decode.rs b/src/decode.rs new file mode 100644 index 000000000..eb458fa25 --- /dev/null +++ b/src/decode.rs @@ -0,0 +1,268 @@ +#![allow(dead_code)] + +use std::{io, usize}; + +use futures::{Async, Poll}; +use bytes::{Bytes, BytesMut}; + +use self::Kind::{Length, Chunked, Eof}; + +/// Decoders to handle different Transfer-Encodings. +/// +/// If a message body does not include a Transfer-Encoding, it *should* +/// include a Content-Length header. +#[derive(Debug, Clone, PartialEq)] +pub struct Decoder { + kind: Kind, +} + +impl Decoder { + pub fn length(x: u64) -> Decoder { + Decoder { kind: Kind::Length(x) } + } + + pub fn chunked() -> Decoder { + Decoder { kind: Kind::Chunked(ChunkedState::Size, 0) } + } + + pub fn eof() -> Decoder { + Decoder { kind: Kind::Eof(false) } + } +} + +#[derive(Debug, Clone, PartialEq)] +enum Kind { + /// A Reader used when a Content-Length header is passed with a positive integer. + Length(u64), + /// A Reader used when Transfer-Encoding is `chunked`. + Chunked(ChunkedState, u64), + /// A Reader used for responses that don't indicate a length or chunked. + /// + /// Note: This should only used for `Response`s. It is illegal for a + /// `Request` to be made with both `Content-Length` and + /// `Transfer-Encoding: chunked` missing, as explained from the spec: + /// + /// > If a Transfer-Encoding header field is present in a response and + /// > the chunked transfer coding is not the final encoding, the + /// > message body length is determined by reading the connection until + /// > it is closed by the server. If a Transfer-Encoding header field + /// > is present in a request and the chunked transfer coding is not + /// > the final encoding, the message body length cannot be determined + /// > reliably; the server MUST respond with the 400 (Bad Request) + /// > status code and then close the connection. + Eof(bool), +} + +#[derive(Debug, PartialEq, Clone)] +enum ChunkedState { + Size, + SizeLws, + Extension, + SizeLf, + Body, + BodyCr, + BodyLf, + EndCr, + EndLf, + End, +} + +impl Decoder { + pub fn is_eof(&self) -> bool { + trace!("is_eof? {:?}", self); + match self.kind { + Length(0) | + Chunked(ChunkedState::End, _) | + Eof(true) => true, + _ => false, + } + } +} + +impl Decoder { + pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { + match self.kind { + Length(ref mut remaining) => { + trace!("Sized read, remaining={:?}", remaining); + if *remaining == 0 { + Ok(Async::Ready(None)) + } else { + let len = body.len() as u64; + let buf; + if *remaining > len { + buf = body.take().freeze(); + *remaining -= len; + } else { + buf = body.split_to(*remaining as usize).freeze(); + *remaining = 0; + } + trace!("Length read: {}", buf.len()); + Ok(Async::Ready(Some(buf))) + } + } + Chunked(ref mut state, ref mut size) => { + loop { + let mut buf = None; + // advances the chunked state + *state = try_ready!(state.step(body, size, &mut buf)); + if *state == ChunkedState::End { + trace!("end of chunked"); + return Ok(Async::Ready(None)); + } + if let Some(buf) = buf { + return Ok(Async::Ready(Some(buf))); + } + if body.is_empty() { + return Ok(Async::NotReady); + } + } + } + Eof(ref mut is_eof) => { + if *is_eof { + Ok(Async::Ready(None)) + } else if !body.is_empty() { + Ok(Async::Ready(Some(body.take().freeze()))) + } else { + Ok(Async::NotReady) + } + } + } + } +} + +macro_rules! byte ( + ($rdr:ident) => ({ + if $rdr.len() > 0 { + let b = $rdr[1]; + $rdr.split_to(1); + b + } else { + return Ok(Async::NotReady) + } + }) +); + +impl ChunkedState { + fn step(&self, body: &mut BytesMut, size: &mut u64, buf: &mut Option) + -> Poll + { + use self::ChunkedState::*; + match *self { + Size => ChunkedState::read_size(body, size), + SizeLws => ChunkedState::read_size_lws(body), + Extension => ChunkedState::read_extension(body), + SizeLf => ChunkedState::read_size_lf(body, size), + Body => ChunkedState::read_body(body, size, buf), + BodyCr => ChunkedState::read_body_cr(body), + BodyLf => ChunkedState::read_body_lf(body), + EndCr => ChunkedState::read_end_cr(body), + EndLf => ChunkedState::read_end_lf(body), + End => Ok(Async::Ready(ChunkedState::End)), + } + } + fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll { + trace!("Read chunk hex size"); + let radix = 16; + match byte!(rdr) { + b @ b'0'...b'9' => { + *size *= radix; + *size += u64::from(b - b'0'); + } + b @ b'a'...b'f' => { + *size *= radix; + *size += u64::from(b + 10 - b'a'); + } + b @ b'A'...b'F' => { + *size *= radix; + *size += u64::from(b + 10 - b'A'); + } + b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)), + b';' => return Ok(Async::Ready(ChunkedState::Extension)), + b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)), + _ => { + return Err(io::Error::new(io::ErrorKind::InvalidInput, + "Invalid chunk size line: Invalid Size")); + } + } + Ok(Async::Ready(ChunkedState::Size)) + } + fn read_size_lws(rdr: &mut BytesMut) -> Poll { + trace!("read_size_lws"); + match byte!(rdr) { + // LWS can follow the chunk size, but no more digits can come + b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)), + b';' => Ok(Async::Ready(ChunkedState::Extension)), + b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), + _ => { + Err(io::Error::new(io::ErrorKind::InvalidInput, + "Invalid chunk size linear white space")) + } + } + } + fn read_extension(rdr: &mut BytesMut) -> Poll { + trace!("read_extension"); + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), + _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions + } + } + fn read_size_lf(rdr: &mut BytesMut, size: &mut u64) -> Poll { + trace!("Chunk size is {:?}", size); + match byte!(rdr) { + b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), + b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk size LF")), + } + } + + fn read_body(rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option) + -> Poll + { + trace!("Chunked read, remaining={:?}", rem); + + let len = rdr.len() as u64; + if len == 0 { + Ok(Async::Ready(ChunkedState::Body)) + } else { + let slice; + if *rem > len { + slice = rdr.take().freeze(); + *rem -= len; + } else { + slice = rdr.split_to(*rem as usize).freeze(); + *rem = 0; + } + *buf = Some(slice); + if *rem > 0 { + Ok(Async::Ready(ChunkedState::Body)) + } else { + Ok(Async::Ready(ChunkedState::BodyCr)) + } + } + } + + fn read_body_cr(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body CR")), + } + } + fn read_body_lf(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\n' => Ok(Async::Ready(ChunkedState::Size)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body LF")), + } + } + fn read_end_cr(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::EndLf)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end CR")), + } + } + fn read_end_lf(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\n' => Ok(Async::Ready(ChunkedState::End)), + _ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end LF")), + } + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 000000000..b4525b300 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,173 @@ +//! Error and Result module. +use std::error::Error as StdError; +use std::fmt; +use std::io::Error as IoError; +use std::str::Utf8Error; +use std::string::FromUtf8Error; + +use httparse; + +use self::Error::{ + Method, + Uri, + Version, + Header, + Status, + Timeout, + Io, + TooLarge, + Incomplete, + Utf8 +}; + +/// Result type often returned from methods that can have error. +pub type Result = ::std::result::Result; + +/// A set of errors that can occur parsing HTTP streams. +#[derive(Debug)] +pub enum Error { + /// An invalid `Method`, such as `GE,T`. + Method, + /// An invalid `Uri`, such as `exam ple.domain`. + Uri, + /// An invalid `HttpVersion`, such as `HTP/1.1` + Version, + /// An invalid `Header`. + Header, + /// A message head is too large to be reasonable. + TooLarge, + /// A message reached EOF, but is not complete. + Incomplete, + /// An invalid `Status`, such as `1337 ELITE`. + Status, + /// A timeout occurred waiting for an IO event. + #[allow(dead_code)] + Timeout, + /// An `io::Error` that occurred while trying to read or write to a network stream. + Io(IoError), + /// Parsing a field as string failed + Utf8(Utf8Error), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Io(ref e) => fmt::Display::fmt(e, f), + Utf8(ref e) => fmt::Display::fmt(e, f), + ref e => f.write_str(e.description()), + } + } +} + +impl StdError for Error { + fn description(&self) -> &str { + match *self { + Method => "Invalid Method specified", + Version => "Invalid HTTP version specified", + Header => "Invalid Header provided", + TooLarge => "Message head is too large", + Status => "Invalid Status provided", + Incomplete => "Message is incomplete", + Timeout => "Timeout", + Uri => "Uri error", + Io(ref e) => e.description(), + Utf8(ref e) => e.description(), + } + } + + fn cause(&self) -> Option<&StdError> { + match *self { + Io(ref error) => Some(error), + Utf8(ref error) => Some(error), + _ => None, + } + } +} + +impl From for Error { + fn from(err: IoError) -> Error { + Io(err) + } +} + +impl From for Error { + fn from(err: Utf8Error) -> Error { + Utf8(err) + } +} + +impl From for Error { + fn from(err: FromUtf8Error) -> Error { + Utf8(err.utf8_error()) + } +} + +impl From for Error { + fn from(err: httparse::Error) -> Error { + match err { + httparse::Error::HeaderName | + httparse::Error::HeaderValue | + httparse::Error::NewLine | + httparse::Error::Token => Header, + httparse::Error::Status => Status, + httparse::Error::TooManyHeaders => TooLarge, + httparse::Error::Version => Version, + } + } +} + +#[cfg(test)] +mod tests { + use std::error::Error as StdError; + use std::io; + use httparse; + use super::Error; + use super::Error::*; + + #[test] + fn test_cause() { + let orig = io::Error::new(io::ErrorKind::Other, "other"); + let desc = orig.description().to_owned(); + let e = Io(orig); + assert_eq!(e.cause().unwrap().description(), desc); + } + + macro_rules! from { + ($from:expr => $error:pat) => { + match Error::from($from) { + e @ $error => { + assert!(e.description().len() >= 5); + } , + e => panic!("{:?}", e) + } + } + } + + macro_rules! from_and_cause { + ($from:expr => $error:pat) => { + match Error::from($from) { + e @ $error => { + let desc = e.cause().unwrap().description(); + assert_eq!(desc, $from.description().to_owned()); + assert_eq!(desc, e.description()); + }, + _ => panic!("{:?}", $from) + } + } + } + + #[test] + fn test_from() { + + from_and_cause!(io::Error::new(io::ErrorKind::Other, "other") => Io(..)); + + from!(httparse::Error::HeaderName => Header); + from!(httparse::Error::HeaderName => Header); + from!(httparse::Error::HeaderValue => Header); + from!(httparse::Error::NewLine => Header); + from!(httparse::Error::Status => Status); + from!(httparse::Error::Token => Header); + from!(httparse::Error::TooManyHeaders => TooLarge); + from!(httparse::Error::Version => Version); + } +} diff --git a/src/httpcodes.rs b/src/httpcodes.rs new file mode 100644 index 000000000..3f3d75bb1 --- /dev/null +++ b/src/httpcodes.rs @@ -0,0 +1,31 @@ +//! Basic http responses +#![allow(non_upper_case_globals)] +use std::rc::Rc; +use http::StatusCode; + +use task::Task; +use route::{Payload, RouteHandler}; +use httpmessage::{Body, HttpRequest, HttpMessage, IntoHttpMessage}; + +pub struct StaticResponse(StatusCode); + +pub const HTTPOk: StaticResponse = StaticResponse(StatusCode::OK); +pub const HTTPCreated: StaticResponse = StaticResponse(StatusCode::CREATED); +pub const HTTPNoContent: StaticResponse = StaticResponse(StatusCode::NO_CONTENT); +pub const HTTPBadRequest: StaticResponse = StaticResponse(StatusCode::BAD_REQUEST); +pub const HTTPNotFound: StaticResponse = StaticResponse(StatusCode::NOT_FOUND); +pub const HTTPMethodNotAllowed: StaticResponse = StaticResponse(StatusCode::METHOD_NOT_ALLOWED); + + +impl RouteHandler for StaticResponse { + fn handle(&self, req: HttpRequest, _: Option, _: Rc) -> Task + { + Task::reply(HttpMessage::new(req, self.0, Body::Empty), None) + } +} + +impl IntoHttpMessage for StaticResponse { + fn into_response(self, req: HttpRequest) -> HttpMessage { + HttpMessage::new(req, self.0, Body::Empty) + } +} diff --git a/src/httpmessage.rs b/src/httpmessage.rs new file mode 100644 index 000000000..ec41cb1f4 --- /dev/null +++ b/src/httpmessage.rs @@ -0,0 +1,306 @@ +//! Pieces pertaining to the HTTP message protocol. +use std::{io, mem}; +use std::str::FromStr; +use std::convert::Into; + +use bytes::Bytes; +use http::{Method, StatusCode, Version, Uri}; +use hyper::header::{Header, Headers}; +use hyper::header::{Connection, ConnectionOption, + Expect, Encoding, ContentLength, TransferEncoding}; + +use Params; +use error::Error; + +pub trait Message { + + fn version(&self) -> Version; + + fn headers(&self) -> &Headers; + + /// Checks if a connection should be kept alive. + fn should_keep_alive(&self) -> bool { + let ret = match (self.version(), self.headers().get::()) { + (Version::HTTP_10, None) => false, + (Version::HTTP_10, Some(conn)) + if !conn.contains(&ConnectionOption::KeepAlive) => false, + (Version::HTTP_11, Some(conn)) + if conn.contains(&ConnectionOption::Close) => false, + _ => true + }; + trace!("should_keep_alive(version={:?}, header={:?}) = {:?}", + self.version(), self.headers().get::(), ret); + ret + } + + /// Checks if a connection is expecting a `100 Continue` before sending its body. + #[inline] + fn expecting_continue(&self) -> bool { + let ret = match (self.version(), self.headers().get::()) { + (Version::HTTP_11, Some(&Expect::Continue)) => true, + _ => false + }; + trace!("expecting_continue(version={:?}, header={:?}) = {:?}", + self.version(), self.headers().get::(), ret); + ret + } + + fn is_chunked(&self) -> Result { + if let Some(&TransferEncoding(ref encodings)) = self.headers().get() { + // https://tools.ietf.org/html/rfc7230#section-3.3.3 + // If Transfer-Encoding header is present, and 'chunked' is + // not the final encoding, and this is a Request, then it is + // mal-formed. A server should responsed with 400 Bad Request. + if encodings.last() == Some(&Encoding::Chunked) { + Ok(true) + } else { + debug!("request with transfer-encoding header, but not chunked, bad request"); + Err(Error::Header) + } + } else { + Ok(false) + } + } + + fn is_upgrade(&self) -> bool { + if let Some(&Connection(ref conn)) = self.headers().get() { + conn.contains(&ConnectionOption::from_str("upgrade").unwrap()) + } else { + false + } + } +} + + +#[derive(Debug)] +/// An HTTP Request +pub struct HttpRequest { + version: Version, + method: Method, + uri: Uri, + headers: Headers, + params: Params, +} + +impl Message for HttpRequest { + fn version(&self) -> Version { + self.version + } + fn headers(&self) -> &Headers { + &self.headers + } +} + +impl HttpRequest { + /// Construct a new Request. + #[inline] + pub fn new(method: Method, uri: Uri, version: Version, headers: Headers) -> Self { + HttpRequest { + method: method, + uri: uri, + version: version, + headers: headers, + params: Params::new(), + } + } + + /// Read the Request Uri. + #[inline] + pub fn uri(&self) -> &Uri { &self.uri } + + /// Read the Request Version. + #[inline] + pub fn version(&self) -> Version { self.version } + + /// Read the Request headers. + #[inline] + pub fn headers(&self) -> &Headers { &self.headers } + + /// Read the Request method. + #[inline] + pub fn method(&self) -> &Method { &self.method } + + // /// The remote socket address of this request + // /// + // /// This is an `Option`, because some underlying transports may not have + // /// a socket address, such as Unix Sockets. + // /// + // /// This field is not used for outgoing requests. + // #[inline] + // pub fn remote_addr(&self) -> Option { self.remote_addr } + + /// The target path of this Request. + #[inline] + pub fn path(&self) -> &str { + self.uri.path() + } + + /// The query string of this Request. + #[inline] + pub fn query(&self) -> Option<&str> { + self.uri.query() + } + + /// Get a mutable reference to the Request headers. + #[inline] + pub fn headers_mut(&mut self) -> &mut Headers { + &mut self.headers + } + + #[inline] + pub fn params(&self) -> &Params { &self.params } + + pub fn with_params(self, params: Params) -> Self { + HttpRequest { + method: self.method, + uri: self.uri, + version: self.version, + headers: self.headers, + params: params + } + } +} + +#[derive(Debug)] +pub enum Body { + Empty, + Binary(Bytes), + Length(u64), + Streaming, +} + +impl Body { + pub fn has_body(&self) -> bool { + match *self { + Body::Length(_) | Body::Streaming => true, + _ => false + } + } +} + +pub trait IntoHttpMessage { + fn into_response(self, req: HttpRequest) -> HttpMessage; +} + +#[derive(Debug)] +/// An HTTP Response +pub struct HttpMessage { + request: HttpRequest, + pub version: Version, + pub headers: Headers, + pub status: StatusCode, + body: Body, + chunked: bool, + keep_alive: Option, + compression: Option, +} + +impl Message for HttpMessage { + fn version(&self) -> Version { + self.version + } + fn headers(&self) -> &Headers { + &self.headers + } +} + +impl HttpMessage { + /// Constructs a default response + #[inline] + pub fn new(request: HttpRequest, status: StatusCode, body: Body) -> HttpMessage { + let version = request.version; + HttpMessage { + request: request, + version: version, + headers: Default::default(), + status: status, + body: body, + chunked: false, + keep_alive: None, + compression: None, + } + } + + /// Get the HTTP version of this response. + #[inline] + pub fn version(&self) -> Version { + self.version + } + + /// Get the headers from the response. + #[inline] + pub fn headers(&self) -> &Headers { + &self.headers + } + + /// Get a mutable reference to the headers. + #[inline] + pub fn headers_mut(&mut self) -> &mut Headers { + &mut self.headers + } + + /// Get the status from the server. + #[inline] + pub fn status(&self) -> StatusCode { + self.status + } + + /// Set the `StatusCode` for this response. + #[inline] + pub fn set_status(&mut self, status: StatusCode) -> &mut Self { + self.status = status; + self + } + + /// Set a header and move the Response. + #[inline] + pub fn set_header(&mut self, header: H) -> &mut Self { + self.headers.set(header); + self + } + + /// Set the headers and move the Response. + #[inline] + pub fn with_headers(&mut self, headers: Headers) -> &mut Self { + self.headers = headers; + self + } + + /// Keep-alive status for this connection + pub fn keep_alive(&self) -> bool { + if let Some(ka) = self.keep_alive { + ka + } else { + self.request.should_keep_alive() + } + } + + /// Force close connection, even if it is marked as keep-alive + pub fn force_close(&mut self) { + self.keep_alive = Some(false); + } + + /// is chunked encoding enabled + pub fn chunked(&self) -> bool { + self.chunked + } + + /// Enables automatic chunked transfer encoding + pub fn enable_chunked_encoding(&mut self) -> Result<(), io::Error> { + if self.headers.has::() { + Err(io::Error::new(io::ErrorKind::Other, + "You can't enable chunked encoding when a content length is set")) + } else { + self.chunked = true; + Ok(()) + } + } + + pub fn body(&self) -> &Body { + &self.body + } + + pub fn set_body>(&mut self, body: B) -> Body { + mem::replace(&mut self.body, body.into()) + } +} diff --git a/src/lib.rs b/src/lib.rs index f36843b62..7bf89e0b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,39 @@ //! Actix http framework + +#[macro_use] +extern crate log; +extern crate time; +extern crate bytes; +#[macro_use] +extern crate futures; +extern crate tokio_core; +extern crate tokio_io; +extern crate tokio_proto; +extern crate hyper; +extern crate http; +extern crate httparse; +extern crate route_recognizer; +extern crate actix; + +mod application; +mod context; +mod error; +mod date; +mod decode; +mod httpmessage; +mod resource; +mod route; +mod router; +mod task; +mod reader; +mod server; + +pub mod httpcodes; +pub use application::HttpApplication; +pub use route::{Route, RouteFactory, Payload, PayloadItem, Frame}; +pub use resource::{HttpResource, HttpResponse}; +pub use server::HttpServer; +pub use context::HttpContext; +pub use router::RoutingMap; +pub use route_recognizer::Params; +pub use httpmessage::{HttpRequest, HttpMessage, IntoHttpMessage}; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 000000000..4a7c128aa --- /dev/null +++ b/src/main.rs @@ -0,0 +1,80 @@ +#![allow(dead_code)] +extern crate actix; +extern crate actix_http; +extern crate tokio_core; +extern crate env_logger; + +use std::net; +use std::str::FromStr; + +use actix::prelude::*; +use actix_http::*; + +struct MyRoute {req: Option} + +impl Actor for MyRoute { + type Context = HttpContext; +} + +impl Route for MyRoute { + type State = (); + + fn request(req: HttpRequest, + payload: Option, + ctx: &mut HttpContext) -> HttpResponse + { + if let Some(pl) = payload { + ctx.add_stream(pl); + HttpResponse::Stream(MyRoute{req: Some(req)}) + } else { + HttpResponse::Reply(req, httpcodes::HTTPOk) + } + } +} + +impl ResponseType for MyRoute { + type Item = (); + type Error = (); +} + +impl StreamHandler for MyRoute {} + +impl Handler for MyRoute { + fn handle(&mut self, msg: PayloadItem, ctx: &mut HttpContext) + -> Response + { + println!("CHUNK: {:?}", msg); + if let Some(req) = self.req.take() { + ctx.start(httpcodes::HTTPOk.into_response(req)); + ctx.write_eof(); + } + + Response::Empty() + } +} + + +fn main() { + let _ = env_logger::init(); + + let sys = actix::System::new("http-example".to_owned()); + + let mut routes = RoutingMap::default(); + + let mut app = HttpApplication::no_state(); + app.add("/test") + .get::() + .post::(); + + routes.add("/blah", app); + + routes.add_resource("/test") + .post::(); + + let http = HttpServer::new(routes); + http.serve::<()>( + &net::SocketAddr::from_str("127.0.0.1:9080").unwrap()).unwrap(); + + println!("starting"); + let _ = sys.run(); +} diff --git a/src/reader.rs b/src/reader.rs new file mode 100644 index 000000000..d6ffca492 --- /dev/null +++ b/src/reader.rs @@ -0,0 +1,343 @@ +use std::{self, fmt, io, ptr}; + +use httparse; +use http::{Method, Version, Uri, HttpTryFrom}; +use bytes::{Bytes, BytesMut, BufMut}; +use futures::{Async, AsyncSink, Poll, Sink}; +use futures::unsync::mpsc::{channel, Sender}; +use tokio_io::AsyncRead; + +use hyper::header::{Headers, ContentLength}; + +use {Payload, PayloadItem}; +use error::{Error, Result}; +use decode::Decoder; +use httpmessage::{Message, HttpRequest}; + + +const MAX_HEADERS: usize = 100; +const INIT_BUFFER_SIZE: usize = 8192; +pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; + +struct PayloadInfo { + tx: Sender, + decoder: Decoder, + tmp_item: Option, +} + +pub struct Reader { + read_buf: BytesMut, + payload: Option, +} + +enum Decoding { + Paused, + Ready, + NotReady, +} + +impl Reader { + pub fn new() -> Reader { + Reader { + read_buf: BytesMut::new(), + payload: None, + } + } + + #[allow(dead_code)] + pub fn consume_leading_lines(&mut self) { + if !self.read_buf.is_empty() { + let mut i = 0; + while i < self.read_buf.len() { + match self.read_buf[i] { + b'\r' | b'\n' => i += 1, + _ => break, + } + } + self.read_buf.split_to(i); + } + } + + fn decode(&mut self) -> std::result::Result + { + if let Some(ref mut payload) = self.payload { + loop { + if let Some(item) = payload.tmp_item.take() { + let eof = item.is_eof(); + + match payload.tx.start_send(item) { + Ok(AsyncSink::NotReady(item)) => { + payload.tmp_item = Some(item); + return Ok(Decoding::Paused) + } + Ok(AsyncSink::Ready) => { + if eof { + return Ok(Decoding::Ready) + } + }, + Err(_) => return Err(Error::Incomplete), + } + } + + match payload.decoder.decode(&mut self.read_buf) { + Ok(Async::Ready(Some(bytes))) => { + match payload.tx.start_send(PayloadItem::Chunk(bytes)) { + Ok(AsyncSink::NotReady(item)) => { + payload.tmp_item = Some(item); + return Ok(Decoding::Paused) + } + Ok(AsyncSink::Ready) => { + continue + } + Err(_) => return Err(Error::Incomplete), + } + }, + Ok(Async::Ready(None)) => { + match payload.tx.start_send(PayloadItem::Eof) { + Ok(AsyncSink::NotReady(item)) => { + payload.tmp_item = Some(item); + return Ok(Decoding::Paused) + } + Ok(AsyncSink::Ready) => { + return Ok(Decoding::Ready) + } + Err(_) => return Err(Error::Incomplete), + } + }, + Ok(Async::NotReady) => return Ok(Decoding::NotReady), + Err(_) => return Err(Error::Incomplete), + } + } + } else { + return Ok(Decoding::Ready) + } + } + + pub fn parse(&mut self, io: &mut T) -> Poll<(HttpRequest, Option), Error> + where T: AsyncRead + { + loop { + match self.decode()? { + Decoding::Paused => return Ok(Async::NotReady), + Decoding::Ready => { + self.payload = None; + break + }, + Decoding::NotReady => { + if 0 == try_ready!(self.read_from_io(io)) { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, ParseEof).into()); + } + } + } + } + + loop { + match try!(parse(&mut self.read_buf)) { + Some((msg, decoder)) => { + let payload = if let Some(decoder) = decoder { + let (tx, rx) = channel(32); + let payload = PayloadInfo { + tx: tx, + decoder: decoder, + tmp_item: None, + }; + self.payload = Some(payload); + + loop { + match self.decode()? { + Decoding::Paused => + break, + Decoding::Ready => { + self.payload = None; + break + }, + Decoding::NotReady => { + match self.read_from_io(io) { + Ok(Async::Ready(0)) => { + trace!("parse eof"); + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, ParseEof).into()); + } + Ok(Async::Ready(_)) => { + continue + } + Ok(Async::NotReady) => break, + Err(err) => return Err(err.into()), + } + } + } + } + Some(rx) + } else { + None + }; + return Ok(Async::Ready((msg, payload))); + }, + None => { + if self.read_buf.capacity() >= MAX_BUFFER_SIZE { + debug!("MAX_BUFFER_SIZE reached, closing"); + return Err(Error::TooLarge); + } + }, + } + if 0 == try_ready!(self.read_from_io(io)) { + trace!("parse eof"); + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ParseEof).into()); + } + } + } + + fn read_from_io(&mut self, io: &mut T) -> Poll { + if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE { + self.read_buf.reserve(INIT_BUFFER_SIZE); + unsafe { // Zero out unused memory + let buf = self.read_buf.bytes_mut(); + let len = buf.len(); + ptr::write_bytes(buf.as_mut_ptr(), 0, len); + } + } + unsafe { + let n = match io.read(self.read_buf.bytes_mut()) { + Ok(n) => n, + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + return Ok(Async::NotReady); + } + return Err(e) + } + }; + self.read_buf.advance_mut(n); + Ok(Async::Ready(n)) + } + } +} + +#[derive(Debug)] +struct ParseEof; + +impl fmt::Display for ParseEof { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("parse eof") + } +} + +impl ::std::error::Error for ParseEof { + fn description(&self) -> &str { + "parse eof" + } +} + + +pub fn parse(buf: &mut BytesMut) -> Result)>> { + if buf.is_empty() { + return Ok(None); + } + + // Parse http message + let mut headers_indices = [HeaderIndices { + name: (0, 0), + value: (0, 0) + }; MAX_HEADERS]; + + let (len, method, path, version, headers_len) = { + let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; + trace!("Request.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); + let mut req = httparse::Request::new(&mut headers); + match try!(req.parse(buf)) { + httparse::Status::Complete(len) => { + trace!("Request.parse Complete({})", len); + let method = Method::try_from(req.method.unwrap()).map_err(|_| Error::Method)?; + let path = req.path.unwrap(); + let bytes_ptr = buf.as_ref().as_ptr() as usize; + let path_start = path.as_ptr() as usize - bytes_ptr; + let path_end = path_start + path.len(); + let path = (path_start, path_end); + + let version = if req.version.unwrap() == 1 { + Version::HTTP_11 + } else { + Version::HTTP_10 + }; + + record_header_indices(buf.as_ref(), req.headers, &mut headers_indices); + let headers_len = req.headers.len(); + (len, method, path, version, headers_len) + } + httparse::Status::Partial => return Ok(None), + } + }; + + let mut headers = Headers::with_capacity(headers_len); + let slice = buf.split_to(len).freeze(); + let path = slice.slice(path.0, path.1); + // path was found to be utf8 by httparse + let uri = Uri::from_shared(path).map_err(|_| Error::Uri)?; + + headers.extend(HeadersAsBytesIter { + headers: headers_indices[..headers_len].iter(), + slice: slice, + }); + + let msg = HttpRequest::new(method, uri, version, headers); + + let _upgrade = msg.is_upgrade(); + let chunked = msg.is_chunked()?; + + // Content-Length + if let Some(&ContentLength(len)) = msg.headers().get() { + if chunked { + return Err(Error::Header) + } + Ok(Some((msg, Some(Decoder::length(len))))) + } else if msg.headers().has::() { + debug!("illegal Content-Length: {:?}", msg.headers().get_raw("Content-Length")); + Err(Error::Header) + } else if chunked { + Ok(Some((msg, Some(Decoder::chunked())))) + } else { + Ok(Some((msg, None))) + } +} + +#[derive(Clone, Copy)] +struct HeaderIndices { + name: (usize, usize), + value: (usize, usize), +} + +fn record_header_indices(bytes: &[u8], + headers: &[httparse::Header], + indices: &mut [HeaderIndices]) +{ + let bytes_ptr = bytes.as_ptr() as usize; + for (header, indices) in headers.iter().zip(indices.iter_mut()) { + let name_start = header.name.as_ptr() as usize - bytes_ptr; + let name_end = name_start + header.name.len(); + indices.name = (name_start, name_end); + let value_start = header.value.as_ptr() as usize - bytes_ptr; + let value_end = value_start + header.value.len(); + indices.value = (value_start, value_end); + } +} + +struct HeadersAsBytesIter<'a> { + headers: ::std::slice::Iter<'a, HeaderIndices>, + slice: Bytes, +} + +impl<'a> Iterator for HeadersAsBytesIter<'a> { + type Item = (&'a str, Bytes); + fn next(&mut self) -> Option { + self.headers.next().map(|header| { + let name = unsafe { + let bytes = ::std::slice::from_raw_parts( + self.slice.as_ref().as_ptr().offset(header.name.0 as isize), + header.name.1 - header.name.0 + ); + ::std::str::from_utf8_unchecked(bytes) + }; + (name, self.slice.slice(header.value.0, header.value.1)) + }) + } +} diff --git a/src/resource.rs b/src/resource.rs new file mode 100644 index 000000000..c6a4ea81e --- /dev/null +++ b/src/resource.rs @@ -0,0 +1,122 @@ +use std::mem; +use std::rc::Rc; +use std::marker::PhantomData; +use std::collections::HashMap; + +use actix::Actor; +use bytes::Bytes; +use http::Method; + +use task::Task; +use route::{Route, Payload, RouteHandler}; +use context::HttpContext; +use httpcodes::HTTPMethodNotAllowed; +use httpmessage::{HttpRequest, HttpMessage, IntoHttpMessage}; + +/// Resource +pub struct HttpResource { + state: PhantomData, + routes: HashMap>>, + default: Box>, +} + +impl Default for HttpResource { + fn default() -> Self { + HttpResource { + state: PhantomData, + routes: HashMap::new(), + default: Box::new(HTTPMethodNotAllowed)} + } +} + + +impl HttpResource where S: 'static { + + pub fn handler(&mut self, method: Method, handler: H) -> &mut Self + where H: RouteHandler + { + self.routes.insert(method, Box::new(handler)); + self + } + + pub fn default_handler(&mut self, handler: H) -> &mut Self + where H: RouteHandler + { + self.default = Box::new(handler); + self + } + + pub fn get(&mut self) -> &mut Self where A: Route + { + self.handler(Method::GET, A::factory()) + } + + pub fn post(&mut self) -> &mut Self where A: Route + { + self.handler(Method::POST, A::factory()) + } + + pub fn put(&mut self) -> &mut Self where A: Route + { + self.handler(Method::PUT, A::factory()) + } + + pub fn delete(&mut self) -> &mut Self where A: Route + { + self.handler(Method::DELETE, A::factory()) + } +} + + +impl RouteHandler for HttpResource { + + fn handle(&self, req: HttpRequest, payload: Option, state: Rc) -> Task { + if let Some(handler) = self.routes.get(req.method()) { + handler.handle(req, payload, state) + } else { + self.default.handle(req, payload, state) + } + } +} + + +#[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))] +enum HttpResponseItem where A: Actor> + Route { + Message(HttpMessage, Option), + Actor(A), +} + +pub struct HttpResponse> + Route> (HttpResponseItem); + +impl HttpResponse where A: Actor> + Route +{ + /// Create async response + #[allow(non_snake_case)] + pub fn Stream(act: A) -> Self { + HttpResponse(HttpResponseItem::Actor(act)) + } + + #[allow(non_snake_case)] + pub fn Reply(req: HttpRequest, msg: I) -> Self + where I: IntoHttpMessage + { + HttpResponse(HttpResponseItem::Message(msg.into_response(req), None)) + } + + #[allow(non_snake_case)] + pub fn ReplyMessage(msg: HttpMessage, body: Option) -> Self { + HttpResponse(HttpResponseItem::Message(msg, body)) + } + + pub(crate) fn into(self, mut ctx: HttpContext) -> Task { + match self.0 { + HttpResponseItem::Message(msg, body) => + Task::reply(msg, body), + HttpResponseItem::Actor(act) => { + let old = ctx.replace_actor(act); + mem::forget(old); + Task::with_stream(ctx) + } + } + } +} diff --git a/src/route.rs b/src/route.rs new file mode 100644 index 000000000..c954a508e --- /dev/null +++ b/src/route.rs @@ -0,0 +1,70 @@ +use std; +use std::rc::Rc; +use std::marker::PhantomData; + +use actix::Actor; +use bytes::Bytes; +use futures::unsync::mpsc::Receiver; + +use task::Task; +use context::HttpContext; +use resource::HttpResponse; +use httpmessage::{HttpRequest, HttpMessage}; + +pub type Payload = Receiver; + +#[derive(Debug)] +pub enum PayloadItem { + Eof, + Chunk(Bytes) +} + +impl PayloadItem { + pub fn is_eof(&self) -> bool { + match *self { + PayloadItem::Eof => true, + _ => false, + } + } + pub fn is_chunk(&self) -> bool { + !self.is_eof() + } +} + + +#[derive(Debug)] +#[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))] +pub enum Frame { + Message(HttpMessage), + Payload(Option), +} + +pub trait RouteHandler: 'static { + fn handle(&self, req: HttpRequest, payload: Option, state: Rc) -> Task; +} + +pub trait Route: Actor> { + type State; + + fn request(req: HttpRequest, + payload: Option, + ctx: &mut HttpContext) -> HttpResponse; + + fn factory() -> RouteFactory { + RouteFactory(PhantomData) + } +} + + +pub struct RouteFactory, S>(PhantomData); + +impl RouteHandler for RouteFactory + where A: Route, + S: 'static +{ + fn handle(&self, req: HttpRequest, payload: Option, state: Rc) -> Task + { + let mut ctx = HttpContext::new(unsafe{std::mem::uninitialized()}, state); + A::request(req, payload, &mut ctx).into(ctx) + } +} diff --git a/src/router.rs b/src/router.rs new file mode 100644 index 000000000..8aebab707 --- /dev/null +++ b/src/router.rs @@ -0,0 +1,97 @@ +use std::rc::Rc; +use std::string::ToString; +use std::collections::HashMap; +use route_recognizer::{Router as Recognizer}; + +use task::Task; +use route::{Payload, RouteHandler}; +use resource::HttpResource; +use application::HttpApplication; +use httpcodes::HTTPNotFound; +use httpmessage::{HttpRequest, IntoHttpMessage}; + +pub trait HttpHandler: 'static { + fn handle(&self, req: HttpRequest, payload: Option) -> Task; +} + +pub struct RoutingMap { + apps: HashMap>, + resources: HashMap, +} + +impl Default for RoutingMap { + fn default() -> Self { + RoutingMap { + apps: HashMap::new(), + resources: HashMap::new() + } + } +} + +impl RoutingMap { + + pub fn add(&mut self, path: P, app: HttpApplication) + where P: ToString + { + let path = path.to_string(); + + // we can not override registered resource + if self.apps.contains_key(&path) { + panic!("Resource is registered: {}", path); + } + + // add application + self.apps.insert(path.clone(), app.prepare(path)); + } + + pub fn add_resource

(&mut self, path: P) -> &mut HttpResource + where P: ToString + { + let path = path.to_string(); + + // add resource + if !self.resources.contains_key(&path) { + self.resources.insert(path.clone(), HttpResource::default()); + } + + self.resources.get_mut(&path).unwrap() + } + + pub(crate) fn into_router(self) -> Router { + let mut router = Recognizer::new(); + + for (path, resource) in self.resources { + router.add(path.as_str(), resource); + } + + Router { + apps: self.apps, + resources: router, + } + } +} + + +pub(crate) +struct Router { + apps: HashMap>, + resources: Recognizer, +} + +impl Router { + + pub fn call(&self, req: HttpRequest, payload: Option) -> Task + { + if let Ok(h) = self.resources.recognize(req.path()) { + h.handler.handle(req.with_params(h.params), payload, Rc::new(())) + } else { + for (prefix, app) in &self.apps { + if req.path().starts_with(prefix) { + return app.handle(req, payload) + } + } + + Task::reply(IntoHttpMessage::into_response(HTTPNotFound, req), None) + } + } +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 000000000..e32845db8 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,137 @@ +use std::{io, net}; +use std::rc::Rc; +use std::collections::VecDeque; + +use actix::dev::*; +use futures::{Future, Poll, Async}; +use tokio_core::net::{TcpListener, TcpStream}; + +use task::Task; +use reader::Reader; +use router::{Router, RoutingMap}; + +pub struct HttpServer { + router: Rc, +} + +impl Actor for HttpServer { + type Context = Context; +} + +impl HttpServer { + pub fn new(routes: RoutingMap) -> Self { + HttpServer {router: Rc::new(routes.into_router())} + } + + pub fn serve(self, addr: &net::SocketAddr) -> io::Result + where Self: ActorAddress + { + let tcp = TcpListener::bind(addr, Arbiter::handle())?; + + Ok(HttpServer::create(move |ctx| { + ctx.add_stream(tcp.incoming()); + self + })) + } +} + +impl ResponseType<(TcpStream, net::SocketAddr)> for HttpServer { + type Item = (); + type Error = (); +} + +impl StreamHandler<(TcpStream, net::SocketAddr), io::Error> for HttpServer {} + +impl Handler<(TcpStream, net::SocketAddr), io::Error> for HttpServer { + + fn handle(&mut self, msg: (TcpStream, net::SocketAddr), _: &mut Context) + -> Response + { + Arbiter::handle().spawn( + HttpChannel{router: Rc::clone(&self.router), + addr: msg.1, + stream: msg.0, + reader: Reader::new(), + items: VecDeque::new(), + inactive: Vec::new(), + }); + Response::Empty() + } +} + + +struct Entry { + task: Task, + eof: bool, + error: bool, + finished: bool, +} + +pub struct HttpChannel { + router: Rc, + #[allow(dead_code)] + addr: net::SocketAddr, + stream: TcpStream, + reader: Reader, + items: VecDeque, + inactive: Vec, +} + +impl Actor for HttpChannel { + type Context = Context; +} + +impl Future for HttpChannel { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + loop { + // check in-flight messages + let mut idx = 0; + while idx < self.items.len() { + if idx == 0 { + if self.items[idx].error { + return Err(()) + } + match self.items[idx].task.poll_io(&mut self.stream) { + Ok(Async::Ready(val)) => { + let mut item = self.items.pop_front().unwrap(); + if !val { + item.eof = true; + self.inactive.push(item); + } + continue + }, + Ok(Async::NotReady) => (), + Err(_) => return Err(()), + } + } else if !self.items[idx].finished { + match self.items[idx].task.poll() { + Ok(Async::Ready(_)) => + self.items[idx].finished = true, + Ok(Async::NotReady) => (), + Err(_) => + self.items[idx].error = true, + } + } + idx += 1; + } + + // read incoming data + match self.reader.parse(&mut self.stream) { + Ok(Async::Ready((req, payload))) => { + self.items.push_back( + Entry {task: self.router.call(req, payload), + eof: false, + error: false, + finished: false}); + }, + Ok(Async::NotReady) => + return Ok(Async::NotReady), + Err(_) => + return Err(()), + } + } + } +} diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 000000000..777453359 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,384 @@ +use std::{cmp, io}; +use std::io::Write as IoWrite; +use std::fmt::Write; +use std::collections::VecDeque; + +use http::{StatusCode, Version}; +use bytes::{Bytes, BytesMut}; +use futures::{Async, Future, Poll, Stream}; +use tokio_core::net::TcpStream; + +use hyper::header::{Date, Connection, ContentType, + ContentLength, Encoding, TransferEncoding}; + +use date; +use route::Frame; +use httpmessage::{Body, HttpMessage}; + +type FrameStream = Stream; +const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific +const DEFAULT_LIMIT: usize = 65_536; // max buffer size 64k + + +#[derive(PartialEq, Debug)] +enum TaskRunningState { + Paused, + Running, + Done, +} + +impl TaskRunningState { + fn is_done(&self) -> bool { + *self == TaskRunningState::Done + } +} + +#[derive(PartialEq, Debug)] +enum TaskIOState { + ReadingMessage, + ReadingPayload, + Done, +} + +impl TaskIOState { + fn is_done(&self) -> bool { + *self == TaskIOState::Done + } +} + +pub struct Task { + state: TaskRunningState, + iostate: TaskIOState, + frames: VecDeque, + stream: Option>, + encoder: Encoder, + buffer: BytesMut, +} + +impl Task { + + pub(crate) fn reply(msg: HttpMessage, body: Option) -> Self { + let mut frames = VecDeque::new(); + if let Some(body) = body { + frames.push_back(Frame::Message(msg)); + frames.push_back(Frame::Payload(Some(body))); + frames.push_back(Frame::Payload(None)); + } else { + frames.push_back(Frame::Message(msg)); + } + + Task { + state: TaskRunningState::Running, + iostate: TaskIOState::Done, + frames: frames, + stream: None, + encoder: Encoder::length(0), + buffer: BytesMut::new(), + } + } + + pub(crate) fn with_stream(stream: S) -> Self + where S: Stream + 'static + { + Task { + state: TaskRunningState::Running, + iostate: TaskIOState::ReadingMessage, + frames: VecDeque::new(), + stream: Some(Box::new(stream)), + encoder: Encoder::length(0), + buffer: BytesMut::new(), + } + } + + fn prepare(&mut self, mut msg: HttpMessage) + { + trace!("Prepare message status={:?}", msg.status); + + let mut extra = 0; + let body = msg.set_body(Body::Empty); + match body { + Body::Empty => { + if msg.chunked() { + error!("Chunked transfer is enabled but body is set to Empty"); + } + msg.headers.set(ContentLength(0)); + msg.headers.remove::(); + self.encoder = Encoder::length(0); + }, + Body::Length(n) => { + if msg.chunked() { + error!("Chunked transfer is enabled but body with specific length is specified"); + } + msg.headers.set(ContentLength(n)); + msg.headers.remove::(); + self.encoder = Encoder::length(n); + }, + Body::Binary(ref bytes) => { + extra = bytes.len(); + msg.headers.set(ContentLength(bytes.len() as u64)); + msg.headers.remove::(); + self.encoder = Encoder::length(0); + } + Body::Streaming => { + if msg.chunked() { + if msg.version < Version::HTTP_11 { + error!("Chunked transfer encoding is forbidden for {:?}", msg.version); + } + msg.headers.remove::(); + msg.headers.set(TransferEncoding(vec![Encoding::Chunked])); + self.encoder = Encoder::chunked(); + } else { + self.encoder = Encoder::eof(); + } + } + } + + // keep-alive + if !msg.headers.has::() { + if msg.keep_alive() { + if msg.version < Version::HTTP_11 { + msg.headers.set(Connection::keep_alive()); + } + } else if msg.version >= Version::HTTP_11 { + msg.headers.set(Connection::close()); + } + } + + // render message + let init_cap = 30 + msg.headers.len() * AVERAGE_HEADER_SIZE + extra; + self.buffer.reserve(init_cap); + + if msg.version == Version::HTTP_11 && msg.status == StatusCode::OK { + self.buffer.extend(b"HTTP/1.1 200 OK\r\n"); + let _ = write!(self.buffer, "{}", msg.headers); + } else { + let _ = write!(self.buffer, "{:?} {}\r\n{}", msg.version, msg.status, msg.headers); + } + // using http::h1::date is quite a lot faster than generating + // a unique Date header each time like req/s goes up about 10% + if !msg.headers.has::() { + self.buffer.reserve(date::DATE_VALUE_LENGTH + 8); + self.buffer.extend(b"Date: "); + date::extend(&mut self.buffer); + self.buffer.extend(b"\r\n"); + } + + // default content-type + if !msg.headers.has::() { + self.buffer.extend(b"ContentType: application/octet-stream\r\n".as_ref()); + } + + self.buffer.extend(b"\r\n"); + + if let Body::Binary(ref bytes) = *msg.body() { + self.buffer.extend(bytes); + return + } + msg.set_body(body); + } + + pub(crate) fn poll_io(&mut self, io: &mut TcpStream) -> Poll { + println!("POLL-IO {:?}", self.frames.len()); + // response is completed + if self.frames.is_empty() && self.iostate.is_done() { + return Ok(Async::Ready(self.state.is_done())); + } else { + // poll stream + if self.state == TaskRunningState::Running { + match self.poll() { + Ok(Async::Ready(_)) => { + self.state = TaskRunningState::Done; + } + Ok(Async::NotReady) => (), + Err(_) => return Err(()) + } + } + + // use exiting frames + while let Some(frame) = self.frames.pop_front() { + match frame { + Frame::Message(message) => { + self.prepare(message); + } + Frame::Payload(chunk) => { + match chunk { + Some(chunk) => { + // TODO: add warning, write after EOF + self.encoder.encode(&mut self.buffer, chunk.as_ref()); + } + None => { + // TODO: add error "not eof"" + if !self.encoder.encode(&mut self.buffer, [].as_ref()) { + debug!("last payload item, but it is not EOF "); + return Err(()) + } + break + } + } + }, + } + } + } + + // write bytes to TcpStream + while !self.buffer.is_empty() { + match io.write(self.buffer.as_ref()) { + Ok(n) => { + self.buffer.split_to(n); + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break + } + Err(_) => return Err(()), + } + } + + // should pause task + if self.state != TaskRunningState::Done { + if self.buffer.len() > DEFAULT_LIMIT { + self.state = TaskRunningState::Paused; + } else if self.state == TaskRunningState::Paused { + self.state = TaskRunningState::Running; + } + } + + // response is completed + if self.buffer.is_empty() && self.iostate.is_done() { + Ok(Async::Ready(self.state.is_done())) + } else { + Ok(Async::NotReady) + } + } +} + +impl Future for Task { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + if let Some(ref mut stream) = self.stream { + loop { + match stream.poll() { + Ok(Async::Ready(Some(frame))) => { + match frame { + Frame::Message(ref msg) => { + if self.iostate != TaskIOState::ReadingMessage { + error!("Non expected frame {:?}", frame); + return Err(()) + } + if msg.body().has_body() { + self.iostate = TaskIOState::ReadingPayload; + } else { + self.iostate = TaskIOState::Done; + } + }, + Frame::Payload(ref chunk) => { + if chunk.is_none() { + self.iostate = TaskIOState::Done; + } else if self.iostate != TaskIOState::ReadingPayload { + error!("Non expected frame {:?}", self.iostate); + return Err(()) + } + }, + } + self.frames.push_back(frame) + }, + Ok(Async::Ready(None)) => + return Ok(Async::Ready(())), + Ok(Async::NotReady) => + return Ok(Async::NotReady), + Err(_) => + return Err(()) + } + } + } else { + Ok(Async::Ready(())) + } + } +} + +/// Encoders to handle different Transfer-Encodings. +#[derive(Debug, Clone)] +struct Encoder { + kind: Kind, +} + +#[derive(Debug, PartialEq, Clone)] +enum Kind { + /// An Encoder for when Transfer-Encoding includes `chunked`. + Chunked(bool), + /// An Encoder for when Content-Length is set. + /// + /// Enforces that the body is not longer than the Content-Length header. + Length(u64), + /// An Encoder for when Content-Length is not known. + /// + /// Appliction decides when to stop writing. + Eof, +} + +impl Encoder { + + pub fn eof() -> Encoder { + Encoder { + kind: Kind::Eof, + } + } + + pub fn chunked() -> Encoder { + Encoder { + kind: Kind::Chunked(false), + } + } + + pub fn length(len: u64) -> Encoder { + Encoder { + kind: Kind::Length(len), + } + } + + /*pub fn is_eof(&self) -> bool { + match self.kind { + Kind::Eof | Kind::Length(0) => true, + Kind::Chunked(eof) => eof, + _ => false, + } + }*/ + + /// Encode message. Return `EOF` state of encoder + pub fn encode(&mut self, dst: &mut BytesMut, msg: &[u8]) -> bool { + match self.kind { + Kind::Eof => { + dst.extend(msg); + msg.is_empty() + }, + Kind::Chunked(ref mut eof) => { + if *eof { + return true; + } + + if msg.is_empty() { + *eof = true; + dst.extend(b"0\r\n\r\n"); + } else { + write!(dst, "{:X}\r\n", msg.len()).unwrap(); + dst.extend(msg); + dst.extend(b"\r\n"); + } + *eof + }, + Kind::Length(ref mut remaining) => { + if msg.is_empty() { + return *remaining == 0 + } + let max = cmp::min(*remaining, msg.len() as u64); + trace!("sized write = {}", max); + dst.extend(msg[..max as usize].as_ref()); + + *remaining -= max as u64; + trace!("encoded {} bytes, remaining = {}", max, remaining); + *remaining == 0 + }, + } + } +}