1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-01-18 05:41:50 +01:00

drop hyper

This commit is contained in:
Nikolay Kim 2017-10-09 23:07:32 -07:00
parent 994a9e907e
commit 676347d7f6
11 changed files with 218 additions and 192 deletions

View File

@ -30,9 +30,6 @@ sha1 = "0.2"
url = "1.5" url = "1.5"
route-recognizer = "0.1" route-recognizer = "0.1"
hyper = "0.11"
unicase = "2.0"
# tokio # tokio
bytes = "0.4" bytes = "0.4"
futures = "0.1" futures = "0.1"

View File

@ -16,6 +16,7 @@ use httpmessage::HttpRequest;
pub struct Application<S=()> { pub struct Application<S=()> {
state: S, state: S,
default: Resource<S>, default: Resource<S>,
handlers: HashMap<String, Box<RouteHandler<S>>>,
resources: HashMap<String, Resource<S>>, resources: HashMap<String, Resource<S>>,
} }
@ -23,6 +24,7 @@ impl<S> Application<S> where S: 'static
{ {
pub(crate) fn prepare(self, prefix: String) -> Box<Handler> { pub(crate) fn prepare(self, prefix: String) -> Box<Handler> {
let mut router = Router::new(); let mut router = Router::new();
let mut handlers = HashMap::new();
let prefix = if prefix.ends_with('/') {prefix } else { prefix + "/" }; let prefix = if prefix.ends_with('/') {prefix } else { prefix + "/" };
for (path, handler) in self.resources { for (path, handler) in self.resources {
@ -30,10 +32,16 @@ impl<S> Application<S> where S: 'static
router.add(path.as_str(), handler); router.add(path.as_str(), handler);
} }
for (path, mut handler) in self.handlers {
let path = prefix.clone() + path.trim_left_matches('/');
handler.set_prefix(path.clone());
handlers.insert(path, handler);
}
Box::new( Box::new(
InnerApplication { InnerApplication {
state: Rc::new(self.state), state: Rc::new(self.state),
default: self.default, default: self.default,
handlers: handlers,
router: router } router: router }
) )
} }
@ -46,6 +54,7 @@ impl Default for Application<()> {
Application { Application {
state: (), state: (),
default: Resource::default(), default: Resource::default(),
handlers: HashMap::new(),
resources: HashMap::new(), resources: HashMap::new(),
} }
} }
@ -60,6 +69,7 @@ impl<S> Application<S> where S: 'static {
Application { Application {
state: state, state: state,
default: Resource::default(), default: Resource::default(),
handlers: HashMap::new(),
resources: HashMap::new(), resources: HashMap::new(),
} }
} }
@ -77,6 +87,20 @@ impl<S> Application<S> where S: 'static {
self.resources.get_mut(&path).unwrap() self.resources.get_mut(&path).unwrap()
} }
/// Add path handler
pub fn add_handler<H, P>(&mut self, path: P, h: H)
where H: RouteHandler<S> + 'static, P: ToString
{
let path = path.to_string();
// add resource
if self.handlers.contains_key(&path) {
panic!("Handler already registered: {:?}", path);
}
self.handlers.insert(path, Box::new(h));
}
/// Default resource is used if no matches route could be found. /// Default resource is used if no matches route could be found.
pub fn default_resource(&mut self) -> &mut Resource<S> { pub fn default_resource(&mut self) -> &mut Resource<S> {
&mut self.default &mut self.default
@ -88,6 +112,7 @@ pub(crate)
struct InnerApplication<S> { struct InnerApplication<S> {
state: Rc<S>, state: Rc<S>,
default: Resource<S>, default: Resource<S>,
handlers: HashMap<String, Box<RouteHandler<S>>>,
router: Router<Resource<S>>, router: Router<Resource<S>>,
} }
@ -98,6 +123,11 @@ impl<S: 'static> Handler for InnerApplication<S> {
if let Ok(h) = self.router.recognize(req.path()) { if let Ok(h) = self.router.recognize(req.path()) {
h.handler.handle(req.with_params(h.params), payload, Rc::clone(&self.state)) h.handler.handle(req.with_params(h.params), payload, Rc::clone(&self.state))
} else { } else {
for (prefix, handler) in &self.handlers {
if req.path().starts_with(prefix) {
return handler.handle(req, payload, Rc::clone(&self.state))
}
}
self.default.handle(req, payload, Rc::clone(&self.state)) self.default.handle(req, payload, Rc::clone(&self.state))
} }
} }

View File

@ -69,7 +69,7 @@ impl<A> AsyncContextApi<A> for HttpContext<A> where A: Actor<Context=Self> + Rou
impl<A> HttpContext<A> where A: Actor<Context=Self> + Route { impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
pub(crate) fn new(state: Rc<<A as Route>::State>) -> HttpContext<A> pub fn new(state: Rc<<A as Route>::State>) -> HttpContext<A>
{ {
HttpContext { HttpContext {
act: None, act: None,

21
src/dev.rs Normal file
View File

@ -0,0 +1,21 @@
//! The `actix-http` prelude for library developers
//!
//! The purpose of this module is to alleviate imports of many common actix traits
//! by adding a glob import to the top of actix heavy modules:
//!
//! ```
//! # #![allow(unused_imports)]
//! use actix_http::dev::*;
//! ```
pub use ws;
pub use httpcodes;
pub use application::Application;
pub use httpmessage::{HttpRequest, HttpResponse, IntoHttpResponse};
pub use payload::{Payload, PayloadItem};
pub use router::RoutingMap;
pub use resource::{Reply, Resource};
pub use route::{Route, RouteFactory, RouteHandler};
pub use server::HttpServer;
pub use context::HttpContext;
pub use task::Task;
pub use route_recognizer::Params;

View File

@ -1,13 +1,10 @@
//! Pieces pertaining to the HTTP message protocol. //! Pieces pertaining to the HTTP message protocol.
use std::{io, mem}; use std::{io, mem};
use std::str::FromStr;
use std::convert::Into; use std::convert::Into;
use bytes::Bytes; use bytes::Bytes;
use http::{Method, StatusCode, Version, Uri}; use http::{Method, StatusCode, Version, Uri, HeaderMap};
use hyper::header::{Header, Headers}; use http::header::{self, HeaderName, HeaderValue};
use hyper::header::{Connection, ConnectionOption,
Expect, Encoding, ContentLength, TransferEncoding};
use Params; use Params;
use error::Error; use error::Error;
@ -23,43 +20,44 @@ pub trait Message {
fn version(&self) -> Version; fn version(&self) -> Version;
fn headers(&self) -> &Headers; fn headers(&self) -> &HeaderMap;
/// Checks if a connection should be kept alive. /// Checks if a connection should be kept alive.
fn should_keep_alive(&self) -> bool { fn keep_alive(&self) -> bool {
let ret = match (self.version(), self.headers().get::<Connection>()) { if let Some(conn) = self.headers().get(header::CONNECTION) {
(Version::HTTP_10, None) => false, if let Ok(conn) = conn.to_str() {
(Version::HTTP_10, Some(conn)) if self.version() == Version::HTTP_10 && !conn.contains("keep-alive") {
if !conn.contains(&ConnectionOption::KeepAlive) => false, false
(Version::HTTP_11, Some(conn)) } else if self.version() == Version::HTTP_11 && conn.contains("close") {
if conn.contains(&ConnectionOption::Close) => false, false
_ => true } else {
}; true
trace!("should_keep_alive(version={:?}, header={:?}) = {:?}", }
self.version(), self.headers().get::<Connection>(), ret); } else {
ret false
}
} else {
self.version() != Version::HTTP_10
}
} }
/// Checks if a connection is expecting a `100 Continue` before sending its body. /// Checks if a connection is expecting a `100 Continue` before sending its body.
#[inline] #[inline]
fn expecting_continue(&self) -> bool { fn expecting_continue(&self) -> bool {
let ret = match (self.version(), self.headers().get::<Expect>()) { if self.version() == Version::HTTP_11 {
(Version::HTTP_11, Some(&Expect::Continue)) => true, if let Some(hdr) = self.headers().get(header::EXPECT) {
_ => false if let Ok(hdr) = hdr.to_str() {
}; return hdr.to_lowercase().contains("continue")
trace!("expecting_continue(version={:?}, header={:?}) = {:?}", }
self.version(), self.headers().get::<Expect>(), ret); }
ret }
false
} }
fn is_chunked(&self) -> Result<bool, Error> { fn is_chunked(&self) -> Result<bool, Error> {
if let Some(&TransferEncoding(ref encodings)) = self.headers().get() { if let Some(encodings) = self.headers().get(header::TRANSFER_ENCODING) {
// https://tools.ietf.org/html/rfc7230#section-3.3.3 if let Ok(s) = encodings.to_str() {
// If Transfer-Encoding header is present, and 'chunked' is return Ok(s.to_lowercase().contains("chunked"))
// not the final encoding, and this is a Request, then it is
// mal-formed. A server should responsed with 400 Bad Request.
if encodings.last() == Some(&Encoding::Chunked) {
Ok(true)
} else { } else {
debug!("request with transfer-encoding header, but not chunked, bad request"); debug!("request with transfer-encoding header, but not chunked, bad request");
Err(Error::Header) Err(Error::Header)
@ -77,7 +75,7 @@ pub struct HttpRequest {
version: Version, version: Version,
method: Method, method: Method,
uri: Uri, uri: Uri,
headers: Headers, headers: HeaderMap,
params: Params, params: Params,
} }
@ -85,7 +83,7 @@ impl Message for HttpRequest {
fn version(&self) -> Version { fn version(&self) -> Version {
self.version self.version
} }
fn headers(&self) -> &Headers { fn headers(&self) -> &HeaderMap {
&self.headers &self.headers
} }
} }
@ -93,7 +91,7 @@ impl Message for HttpRequest {
impl HttpRequest { impl HttpRequest {
/// Construct a new Request. /// Construct a new Request.
#[inline] #[inline]
pub fn new(method: Method, uri: Uri, version: Version, headers: Headers) -> Self { pub fn new(method: Method, uri: Uri, version: Version, headers: HeaderMap) -> Self {
HttpRequest { HttpRequest {
method: method, method: method,
uri: uri, uri: uri,
@ -113,7 +111,7 @@ impl HttpRequest {
/// Read the Request headers. /// Read the Request headers.
#[inline] #[inline]
pub fn headers(&self) -> &Headers { &self.headers } pub fn headers(&self) -> &HeaderMap { &self.headers }
/// Read the Request method. /// Read the Request method.
#[inline] #[inline]
@ -142,7 +140,7 @@ impl HttpRequest {
/// Get a mutable reference to the Request headers. /// Get a mutable reference to the Request headers.
#[inline] #[inline]
pub fn headers_mut(&mut self) -> &mut Headers { pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.headers &mut self.headers
} }
@ -164,27 +162,13 @@ impl HttpRequest {
} }
} }
/// Is keepalive enabled by client?
pub fn keep_alive(&self) -> bool {
let ret = match (self.version(), self.headers().get::<Connection>()) {
(Version::HTTP_10, None) => false,
(Version::HTTP_10, Some(conn))
if !conn.contains(&ConnectionOption::KeepAlive) => false,
(Version::HTTP_11, Some(conn))
if conn.contains(&ConnectionOption::Close) => false,
_ => true
};
trace!("should_keep_alive(version={:?}, header={:?}) = {:?}",
self.version(), self.headers().get::<Connection>(), ret);
ret
}
pub(crate) fn is_upgrade(&self) -> bool { pub(crate) fn is_upgrade(&self) -> bool {
if let Some(&Connection(ref conn)) = self.headers().get() { if let Some(ref conn) = self.headers().get(header::CONNECTION) {
conn.contains(&ConnectionOption::from_str("upgrade").unwrap()) if let Ok(s) = conn.to_str() {
} else { return s.to_lowercase().contains("upgrade")
false }
} }
false
} }
} }
@ -225,12 +209,12 @@ pub trait IntoHttpResponse {
pub struct HttpResponse { pub struct HttpResponse {
request: HttpRequest, request: HttpRequest,
pub version: Version, pub version: Version,
pub headers: Headers, pub headers: HeaderMap,
pub status: StatusCode, pub status: StatusCode,
reason: Option<&'static str>, reason: Option<&'static str>,
body: Body, body: Body,
chunked: bool, chunked: bool,
compression: Option<Encoding>, // compression: Option<Encoding>,
connection_type: Option<ConnectionType>, connection_type: Option<ConnectionType>,
} }
@ -238,7 +222,7 @@ impl Message for HttpResponse {
fn version(&self) -> Version { fn version(&self) -> Version {
self.version self.version
} }
fn headers(&self) -> &Headers { fn headers(&self) -> &HeaderMap {
&self.headers &self.headers
} }
} }
@ -256,7 +240,7 @@ impl HttpResponse {
reason: None, reason: None,
body: body, body: body,
chunked: false, chunked: false,
compression: None, // compression: None,
connection_type: None, connection_type: None,
} }
} }
@ -275,13 +259,13 @@ impl HttpResponse {
/// Get the headers from the response. /// Get the headers from the response.
#[inline] #[inline]
pub fn headers(&self) -> &Headers { pub fn headers(&self) -> &HeaderMap {
&self.headers &self.headers
} }
/// Get a mutable reference to the headers. /// Get a mutable reference to the headers.
#[inline] #[inline]
pub fn headers_mut(&mut self) -> &mut Headers { pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.headers &mut self.headers
} }
@ -300,14 +284,14 @@ impl HttpResponse {
/// Set a header and move the Response. /// Set a header and move the Response.
#[inline] #[inline]
pub fn set_header<H: Header>(mut self, header: H) -> Self { pub fn set_header(mut self, name: HeaderName, value: HeaderValue) -> Self {
self.headers.set(header); self.headers.insert(name, value);
self self
} }
/// Set the headers. /// Set the headers.
#[inline] #[inline]
pub fn with_headers(mut self, headers: Headers) -> Self { pub fn with_headers(mut self, headers: HeaderMap) -> Self {
self.headers = headers; self.headers = headers;
self self
} }
@ -335,7 +319,7 @@ impl HttpResponse {
if let Some(ConnectionType::KeepAlive) = self.connection_type { if let Some(ConnectionType::KeepAlive) = self.connection_type {
true true
} else { } else {
self.request.should_keep_alive() self.request.keep_alive()
} }
} }
@ -351,7 +335,7 @@ impl HttpResponse {
/// Enables automatic chunked transfer encoding /// Enables automatic chunked transfer encoding
pub fn enable_chunked_encoding(&mut self) -> Result<(), io::Error> { pub fn enable_chunked_encoding(&mut self) -> Result<(), io::Error> {
if self.headers.has::<ContentLength>() { if self.headers.contains_key(header::CONTENT_LENGTH) {
Err(io::Error::new(io::ErrorKind::Other, Err(io::Error::new(io::ErrorKind::Other,
"You can't enable chunked encoding when a content length is set")) "You can't enable chunked encoding when a content length is set"))
} else { } else {

View File

@ -11,9 +11,6 @@ extern crate futures;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_proto; extern crate tokio_proto;
#[macro_use]
extern crate hyper;
extern crate unicase;
extern crate http; extern crate http;
extern crate httparse; extern crate httparse;
@ -33,11 +30,11 @@ mod router;
mod task; mod task;
mod reader; mod reader;
mod server; mod server;
pub mod ws;
mod wsframe; mod wsframe;
mod wsproto; mod wsproto;
pub mod ws;
pub mod dev;
pub mod httpcodes; pub mod httpcodes;
pub use application::Application; pub use application::Application;
pub use httpmessage::{HttpRequest, HttpResponse, IntoHttpResponse}; pub use httpmessage::{HttpRequest, HttpResponse, IntoHttpResponse};

View File

@ -1,13 +1,12 @@
use std::{self, fmt, io, ptr}; use std::{self, fmt, io, ptr};
use httparse; use httparse;
use http::{Method, Version, Uri, HttpTryFrom}; use http::{Method, Version, Uri, HttpTryFrom, HeaderMap};
use bytes::{Bytes, BytesMut, BufMut}; use http::header::{self, HeaderName, HeaderValue};
use bytes::{BytesMut, BufMut};
use futures::{Async, Poll}; use futures::{Async, Poll};
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
use hyper::header::{Headers, ContentLength};
use error::{Error, Result}; use error::{Error, Result};
use decode::Decoder; use decode::Decoder;
use payload::{Payload, PayloadSender}; use payload::{Payload, PayloadSender};
@ -50,8 +49,7 @@ impl Reader {
b'\r' | b'\n' => i += 1, b'\r' | b'\n' => i += 1,
_ => break, _ => break,
} }
} } self.read_buf.split_to(i);
self.read_buf.split_to(i);
} }
} }
@ -82,13 +80,10 @@ impl Reader {
pub fn parse<T>(&mut self, io: &mut T) -> Poll<(HttpRequest, Payload), Error> pub fn parse<T>(&mut self, io: &mut T) -> Poll<(HttpRequest, Payload), Error>
where T: AsyncRead where T: AsyncRead
{ {
loop { loop {
match self.decode()? { match self.decode()? {
Decoding::Paused => return Ok(Async::NotReady), Decoding::Paused => return Ok(Async::NotReady),
Decoding::Ready => { Decoding::Ready => {
println!("decode ready");
self.payload = None; self.payload = None;
break break
}, },
@ -117,7 +112,6 @@ impl Reader {
Decoding::Paused => Decoding::Paused =>
break, break,
Decoding::Ready => { Decoding::Ready => {
println!("decoded 3");
self.payload = None; self.payload = None;
break break
}, },
@ -238,38 +232,56 @@ pub fn parse(buf: &mut BytesMut) -> Result<Option<(HttpRequest, Option<Decoder>)
} }
}; };
let mut headers = Headers::with_capacity(headers_len);
let slice = buf.split_to(len).freeze(); let slice = buf.split_to(len).freeze();
let path = slice.slice(path.0, path.1); let path = slice.slice(path.0, path.1);
// path was found to be utf8 by httparse // path was found to be utf8 by httparse
let uri = Uri::from_shared(path).map_err(|_| Error::Uri)?; let uri = Uri::from_shared(path).map_err(|_| Error::Uri)?;
headers.extend(HeadersAsBytesIter { // convert headers
headers: headers_indices[..headers_len].iter(), let mut headers = HeaderMap::with_capacity(headers_len);
slice: slice, for header in headers_indices[..headers_len].iter() {
}); if let Ok(name) = HeaderName::try_from(slice.slice(header.name.0, header.name.1)) {
if let Ok(value) = HeaderValue::try_from(
slice.slice(header.value.0, header.value.1))
{
headers.insert(name, value);
} else {
return Err(Error::Header)
}
} else {
return Err(Error::Header)
}
}
let msg = HttpRequest::new(method, uri, version, headers); let msg = HttpRequest::new(method, uri, version, headers);
let upgrade = msg.is_upgrade() || *msg.method() == Method::CONNECT; let upgrade = msg.is_upgrade() || *msg.method() == Method::CONNECT;
let chunked = msg.is_chunked()?; let chunked = msg.is_chunked()?;
if upgrade { let decoder = if upgrade {
Ok(Some((msg, Some(Decoder::eof())))) Some(Decoder::eof())
} }
// Content-Length // Content-Length
else if let Some(&ContentLength(len)) = msg.headers().get() { else if let Some(ref len) = msg.headers().get(header::CONTENT_LENGTH) {
if chunked { if chunked {
return Err(Error::Header) return Err(Error::Header)
} }
Ok(Some((msg, Some(Decoder::length(len))))) if let Ok(s) = len.to_str() {
} else if msg.headers().has::<ContentLength>() { if let Ok(len) = s.parse::<u64>() {
debug!("illegal Content-Length: {:?}", msg.headers().get_raw("Content-Length")); Some(Decoder::length(len))
Err(Error::Header) } else {
debug!("illegal Content-Length: {:?}", len);
return Err(Error::Header)
}
} else {
debug!("illegal Content-Length: {:?}", len);
return Err(Error::Header)
}
} else if chunked { } else if chunked {
Ok(Some((msg, Some(Decoder::chunked())))) Some(Decoder::chunked())
} else { } else {
Ok(Some((msg, None))) None
} };
Ok(Some((msg, decoder)))
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@ -292,24 +304,3 @@ fn record_header_indices(bytes: &[u8],
indices.value = (value_start, value_end); indices.value = (value_start, value_end);
} }
} }
struct HeadersAsBytesIter<'a> {
headers: ::std::slice::Iter<'a, HeaderIndices>,
slice: Bytes,
}
impl<'a> Iterator for HeadersAsBytesIter<'a> {
type Item = (&'a str, Bytes);
fn next(&mut self) -> Option<Self::Item> {
self.headers.next().map(|header| {
let name = unsafe {
let bytes = ::std::slice::from_raw_parts(
self.slice.as_ref().as_ptr().offset(header.name.0 as isize),
header.name.1 - header.name.0
);
::std::str::from_utf8_unchecked(bytes)
};
(name, self.slice.slice(header.value.0, header.value.1))
})
}
}

View File

@ -66,25 +66,29 @@ impl<S> Resource<S> where S: 'static {
} }
/// Handler for `GET` method. /// Handler for `GET` method.
pub fn get<A>(&mut self) -> &mut Self where A: Route<State=S> pub fn get<A>(&mut self) -> &mut Self
where A: Actor<Context=HttpContext<A>> + Route<State=S>
{ {
self.handler(Method::GET, A::factory()) self.handler(Method::GET, A::factory())
} }
/// Handler for `POST` method. /// Handler for `POST` method.
pub fn post<A>(&mut self) -> &mut Self where A: Route<State=S> pub fn post<A>(&mut self) -> &mut Self
where A: Actor<Context=HttpContext<A>> + Route<State=S>
{ {
self.handler(Method::POST, A::factory()) self.handler(Method::POST, A::factory())
} }
/// Handler for `PUR` method. /// Handler for `PUR` method.
pub fn put<A>(&mut self) -> &mut Self where A: Route<State=S> pub fn put<A>(&mut self) -> &mut Self
where A: Actor<Context=HttpContext<A>> + Route<State=S>
{ {
self.handler(Method::PUT, A::factory()) self.handler(Method::PUT, A::factory())
} }
/// Handler for `METHOD` method. /// Handler for `METHOD` method.
pub fn delete<A>(&mut self) -> &mut Self where A: Route<State=S> pub fn delete<A>(&mut self) -> &mut Self
where A: Actor<Context=HttpContext<A>> + Route<State=S>
{ {
self.handler(Method::DELETE, A::factory()) self.handler(Method::DELETE, A::factory())
} }
@ -104,15 +108,15 @@ impl<S: 'static> RouteHandler<S> for Resource<S> {
#[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))] #[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))]
enum ReplyItem<A> where A: Actor<Context=HttpContext<A>> + Route { enum ReplyItem<A> where A: Actor + Route {
Message(HttpResponse), Message(HttpResponse),
Actor(A), Actor(A),
} }
/// Represents response process. /// Represents response process.
pub struct Reply<A: Actor<Context=HttpContext<A>> + Route> (ReplyItem<A>); pub struct Reply<A: Actor + Route> (ReplyItem<A>);
impl<A> Reply<A> where A: Actor<Context=HttpContext<A>> + Route impl<A> Reply<A> where A: Actor + Route
{ {
/// Create async response /// Create async response
pub fn stream(act: A) -> Self { pub fn stream(act: A) -> Self {
@ -129,7 +133,8 @@ impl<A> Reply<A> where A: Actor<Context=HttpContext<A>> + Route
Reply(ReplyItem::Message(msg.response(req))) Reply(ReplyItem::Message(msg.response(req)))
} }
pub(crate) fn into(self, mut ctx: HttpContext<A>) -> Task { pub fn into(self, mut ctx: HttpContext<A>) -> Task where A: Actor<Context=HttpContext<A>>
{
match self.0 { match self.0 {
ReplyItem::Message(msg) => { ReplyItem::Message(msg) => {
Task::reply(msg) Task::reply(msg)

View File

@ -20,11 +20,15 @@ pub enum Frame {
/// Trait defines object that could be regestered as resource route /// Trait defines object that could be regestered as resource route
pub trait RouteHandler<S>: 'static { pub trait RouteHandler<S>: 'static {
/// Handle request
fn handle(&self, req: HttpRequest, payload: Payload, state: Rc<S>) -> Task; fn handle(&self, req: HttpRequest, payload: Payload, state: Rc<S>) -> Task;
/// Set route prefix
fn set_prefix(&mut self, _prefix: String) {}
} }
/// Actors with ability to handle http requests /// Actors with ability to handle http requests
pub trait Route: Actor<Context=HttpContext<Self>> { pub trait Route: Actor {
/// Route shared state. State is shared with all routes within same application and could be /// Route shared state. State is shared with all routes within same application and could be
/// accessed with `HttpContext::state()` method. /// accessed with `HttpContext::state()` method.
type State; type State;
@ -33,7 +37,7 @@ pub trait Route: Actor<Context=HttpContext<Self>> {
/// result immediately with `Reply::reply` or `Reply::with`. /// result immediately with `Reply::reply` or `Reply::with`.
/// Actor itself could be returned for handling streaming request/response. /// Actor itself could be returned for handling streaming request/response.
/// In that case `HttpContext::start` and `HttpContext::write` has to be used. /// In that case `HttpContext::start` and `HttpContext::write` has to be used.
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self>; fn request(req: HttpRequest, payload: Payload, ctx: &mut Self::Context) -> Reply<Self>;
/// This method creates `RouteFactory` for this actor. /// This method creates `RouteFactory` for this actor.
fn factory() -> RouteFactory<Self, Self::State> { fn factory() -> RouteFactory<Self, Self::State> {
@ -45,7 +49,7 @@ pub trait Route: Actor<Context=HttpContext<Self>> {
pub struct RouteFactory<A: Route<State=S>, S>(PhantomData<A>); pub struct RouteFactory<A: Route<State=S>, S>(PhantomData<A>);
impl<A, S> RouteHandler<S> for RouteFactory<A, S> impl<A, S> RouteHandler<S> for RouteFactory<A, S>
where A: Route<State=S>, where A: Actor<Context=HttpContext<A>> + Route<State=S>,
S: 'static S: 'static
{ {
fn handle(&self, req: HttpRequest, payload: Payload, state: Rc<A::State>) -> Task fn handle(&self, req: HttpRequest, payload: Payload, state: Rc<A::State>) -> Task

View File

@ -4,14 +4,12 @@ use std::fmt::Write;
use std::collections::VecDeque; use std::collections::VecDeque;
use http::{StatusCode, Version}; use http::{StatusCode, Version};
use http::header::{HeaderValue,
CONNECTION, CONTENT_TYPE, CONTENT_LENGTH, TRANSFER_ENCODING, DATE};
use bytes::BytesMut; use bytes::BytesMut;
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use unicase::Ascii;
use hyper::header::{Date, Connection, ConnectionOption,
ContentType, ContentLength, Encoding, TransferEncoding};
use date; use date;
use route::Frame; use route::Frame;
use httpmessage::{Body, HttpResponse}; use httpmessage::{Body, HttpResponse};
@ -100,22 +98,26 @@ impl Task {
if msg.chunked() { if msg.chunked() {
error!("Chunked transfer is enabled but body is set to Empty"); error!("Chunked transfer is enabled but body is set to Empty");
} }
msg.headers.set(ContentLength(0)); msg.headers.insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
msg.headers.remove::<TransferEncoding>(); msg.headers.remove(TRANSFER_ENCODING);
self.encoder = Encoder::length(0); self.encoder = Encoder::length(0);
}, },
Body::Length(n) => { Body::Length(n) => {
if msg.chunked() { if msg.chunked() {
error!("Chunked transfer is enabled but body with specific length is specified"); error!("Chunked transfer is enabled but body with specific length is specified");
} }
msg.headers.set(ContentLength(n)); msg.headers.insert(
msg.headers.remove::<TransferEncoding>(); CONTENT_LENGTH,
HeaderValue::from_str(format!("{}", n).as_str()).unwrap());
msg.headers.remove(TRANSFER_ENCODING);
self.encoder = Encoder::length(n); self.encoder = Encoder::length(n);
}, },
Body::Binary(ref bytes) => { Body::Binary(ref bytes) => {
extra = bytes.len(); extra = bytes.len();
msg.headers.set(ContentLength(bytes.len() as u64)); msg.headers.insert(
msg.headers.remove::<TransferEncoding>(); CONTENT_LENGTH,
HeaderValue::from_str(format!("{}", bytes.len()).as_str()).unwrap());
msg.headers.remove(TRANSFER_ENCODING);
self.encoder = Encoder::length(0); self.encoder = Encoder::length(0);
} }
Body::Streaming => { Body::Streaming => {
@ -123,16 +125,15 @@ impl Task {
if msg.version < Version::HTTP_11 { if msg.version < Version::HTTP_11 {
error!("Chunked transfer encoding is forbidden for {:?}", msg.version); error!("Chunked transfer encoding is forbidden for {:?}", msg.version);
} }
msg.headers.remove::<ContentLength>(); msg.headers.remove(CONTENT_LENGTH);
msg.headers.set(TransferEncoding(vec![Encoding::Chunked])); msg.headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
self.encoder = Encoder::chunked(); self.encoder = Encoder::chunked();
} else { } else {
self.encoder = Encoder::eof(); self.encoder = Encoder::eof();
} }
} }
Body::Upgrade => { Body::Upgrade => {
msg.headers.set(Connection(vec![ msg.headers.insert(CONNECTION, HeaderValue::from_static("upgrade"));
ConnectionOption::ConnectionHeader(Ascii::new("upgrade".to_owned()))]));
self.encoder = Encoder::eof(); self.encoder = Encoder::eof();
} }
} }
@ -140,10 +141,10 @@ impl Task {
// keep-alive // keep-alive
if msg.keep_alive() { if msg.keep_alive() {
if msg.version < Version::HTTP_11 { if msg.version < Version::HTTP_11 {
msg.headers.set(Connection::keep_alive()); msg.headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
} }
} else if msg.version >= Version::HTTP_11 { } else if msg.version >= Version::HTTP_11 {
msg.headers.set(Connection::close()); msg.headers.insert(CONNECTION, HeaderValue::from_static("close"));
} }
// render message // render message
@ -152,14 +153,20 @@ impl Task {
if msg.version == Version::HTTP_11 && msg.status == StatusCode::OK { if msg.version == Version::HTTP_11 && msg.status == StatusCode::OK {
self.buffer.extend(b"HTTP/1.1 200 OK\r\n"); self.buffer.extend(b"HTTP/1.1 200 OK\r\n");
let _ = write!(self.buffer, "{}", msg.headers);
} else { } else {
let _ = write!(self.buffer, "{:?} {}\r\n{}", msg.version, msg.status, msg.headers); let _ = write!(self.buffer, "{:?} {}\r\n", msg.version, msg.status);
}
for (key, value) in &msg.headers {
let t: &[u8] = key.as_ref();
self.buffer.extend(t);
self.buffer.extend(b": ");
self.buffer.extend(value.as_ref());
self.buffer.extend(b"\r\n");
} }
// using http::h1::date is quite a lot faster than generating // using http::h1::date is quite a lot faster than generating
// a unique Date header each time like req/s goes up about 10% // a unique Date header each time like req/s goes up about 10%
if !msg.headers.has::<Date>() { if !msg.headers.contains_key(DATE) {
self.buffer.reserve(date::DATE_VALUE_LENGTH + 8); self.buffer.reserve(date::DATE_VALUE_LENGTH + 8);
self.buffer.extend(b"Date: "); self.buffer.extend(b"Date: ");
date::extend(&mut self.buffer); date::extend(&mut self.buffer);
@ -167,7 +174,7 @@ impl Task {
} }
// default content-type // default content-type
if !msg.headers.has::<ContentType>() { if !msg.headers.contains_key(CONTENT_TYPE) {
self.buffer.extend(b"ContentType: application/octet-stream\r\n".as_ref()); self.buffer.extend(b"ContentType: application/octet-stream\r\n".as_ref());
} }

View File

@ -64,10 +64,10 @@
//! fn main() {} //! fn main() {}
//! ``` //! ```
use std::vec::Vec; use std::vec::Vec;
use http::{Method, StatusCode}; use std::str::FromStr;
use http::{Method, StatusCode, header};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use hyper::header;
use actix::Actor; use actix::Actor;
@ -81,22 +81,13 @@ use wsframe;
use wsproto::*; use wsproto::*;
#[doc(hidden)] #[doc(hidden)]
header! { const SEC_WEBSOCKET_ACCEPT: &'static str = "SEC-WEBSOCKET-ACCEPT";
/// SEC-WEBSOCKET-ACCEPT header #[doc(hidden)]
(WebSocketAccept, "SEC-WEBSOCKET-ACCEPT") => [String] const SEC_WEBSOCKET_KEY: &'static str = "SEC-WEBSOCKET-KEY";
} #[doc(hidden)]
header! { const SEC_WEBSOCKET_VERSION: &'static str = "SEC-WEBSOCKET-VERSION";
/// SEC-WEBSOCKET-KEY header // #[doc(hidden)]
(WebSocketKey, "SEC-WEBSOCKET-KEY") => [String] // const SEC_WEBSOCKET_PROTOCOL: &'static str = "SEC-WEBSOCKET-PROTOCOL";
}
header! {
/// SEC-WEBSOCKET-VERSION header
(WebSocketVersion, "SEC-WEBSOCKET-VERSION") => [String]
}
header! {
/// SEC-WEBSOCKET-PROTOCOL header
(WebSocketProtocol, "SEC-WEBSOCKET-PROTOCOL") => [String]
}
/// `WebSocket` Message /// `WebSocket` Message
@ -126,8 +117,12 @@ pub fn handshake(req: HttpRequest) -> Result<HttpResponse, HttpResponse> {
} }
// Check for "UPGRADE" to websocket header // Check for "UPGRADE" to websocket header
let has_hdr = if let Some::<&header::Upgrade>(hdr) = req.headers().get() { let has_hdr = if let Some(hdr) = req.headers().get(header::UPGRADE) {
hdr.0.contains(&header::Protocol::new(header::ProtocolName::WebSocket, None)) if let Ok(s) = hdr.to_str() {
s.to_lowercase().contains("websocket")
} else {
false
}
} else { } else {
false false
}; };
@ -141,14 +136,14 @@ pub fn handshake(req: HttpRequest) -> Result<HttpResponse, HttpResponse> {
} }
// check supported version // check supported version
if !req.headers().has::<WebSocketVersion>() { if !req.headers().contains_key(SEC_WEBSOCKET_VERSION) {
return Err(HTTPBadRequest.with_reason(req, "No websocket version header is required")) return Err(HTTPBadRequest.with_reason(req, "No websocket version header is required"))
} }
let supported_ver = { let supported_ver = {
let hdr = req.headers().get::<WebSocketVersion>().unwrap(); if let Some(hdr) = req.headers().get(SEC_WEBSOCKET_VERSION) {
match hdr.0.as_str() { hdr == "13" || hdr == "8" || hdr == "7"
"13" | "8" | "7" => true, } else {
_ => false, false
} }
}; };
if !supported_ver { if !supported_ver {
@ -156,25 +151,20 @@ pub fn handshake(req: HttpRequest) -> Result<HttpResponse, HttpResponse> {
} }
// check client handshake for validity // check client handshake for validity
let key = if let Some::<&WebSocketKey>(hdr) = req.headers().get() { if !req.headers().contains_key(SEC_WEBSOCKET_KEY) {
Some(hash_key(hdr.0.as_bytes()))
} else {
None
};
let key = if let Some(key) = key {
key
} else {
return Err(HTTPBadRequest.with_reason(req, "Handshake error")); return Err(HTTPBadRequest.with_reason(req, "Handshake error"));
}
let key = {
let key = req.headers().get(SEC_WEBSOCKET_KEY).unwrap();
hash_key(key.as_ref())
}; };
Ok(HttpResponse::new(req, StatusCode::SWITCHING_PROTOCOLS, Body::Empty) Ok(HttpResponse::new(req, StatusCode::SWITCHING_PROTOCOLS, Body::Empty)
.set_connection_type(ConnectionType::Upgrade) .set_connection_type(ConnectionType::Upgrade)
.set_header( .set_header(header::UPGRADE, header::HeaderValue::from_static("websocket"))
header::Upgrade(vec![header::Protocol::new(header::ProtocolName::WebSocket, None)])) .set_header(header::TRANSFER_ENCODING, header::HeaderValue::from_static("chunked"))
.set_header( .set_header(header::HeaderName::from_str(SEC_WEBSOCKET_ACCEPT).unwrap(),
header::TransferEncoding(vec![header::Encoding::Chunked])) header::HeaderValue::from_str(key.as_str()).unwrap())
.set_header(
WebSocketAccept(key))
.set_body(Body::Upgrade) .set_body(Body::Upgrade)
) )
} }