1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-06-29 19:24:58 +02:00

refactor HttpRequest mutability

This commit is contained in:
Nikolay Kim
2018-06-25 10:58:04 +06:00
parent 445ea043dd
commit fec6047ddc
51 changed files with 2239 additions and 2156 deletions

25
src/server/error.rs Normal file
View File

@ -0,0 +1,25 @@
use futures::{Async, Poll};
use super::{helpers, HttpHandlerTask, Writer};
use http::{StatusCode, Version};
use httpresponse::HttpResponse;
use Error;
pub(crate) struct ServerError(Version, StatusCode);
impl ServerError {
pub fn err(ver: Version, status: StatusCode) -> Box<HttpHandlerTask> {
Box::new(ServerError(ver, status))
}
}
impl HttpHandlerTask for ServerError {
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
{
let mut bytes = io.buffer();
helpers::write_status_line(self.0, self.1.as_u16(), bytes);
}
io.set_date();
Ok(Async::Ready(true))
}
}

View File

@ -8,11 +8,13 @@ use futures::{Async, Future, Poll};
use tokio_timer::Delay;
use error::{Error, PayloadError};
use http::{StatusCode, Version};
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use payload::{Payload, PayloadStatus, PayloadWriter};
use pipeline::Pipeline;
use super::error::ServerError;
use super::h1decoder::{DecoderError, H1Decoder, Message};
use super::h1writer::H1Writer;
use super::input::PayloadType;
@ -180,9 +182,7 @@ where
&& self.tasks.len() < MAX_PIPELINED_MESSAGES
&& self.can_read()
{
let res = self.stream.get_mut().read_available(&mut self.buf);
match res {
//self.stream.get_mut().read_available(&mut self.buf) {
match self.stream.get_mut().read_available(&mut self.buf) {
Ok(Async::Ready(disconnected)) => {
if disconnected {
// notify all tasks
@ -363,22 +363,19 @@ where
if payload {
let (ps, pl) = Payload::new(false);
msg.get_mut().payload = Some(pl);
self.payload =
Some(PayloadType::new(&msg.get_ref().headers, ps));
*msg.inner.payload.borrow_mut() = Some(pl);
self.payload = Some(PayloadType::new(&msg.inner.headers, ps));
}
let mut req = HttpRequest::from_message(msg);
// set remote addr
req.set_peer_addr(self.addr);
msg.inner.addr = self.addr;
// stop keepalive timer
self.keepalive_timer.take();
// search handler for request
for h in self.settings.handlers().iter_mut() {
req = match h.handle(req) {
msg = match h.handle(msg) {
Ok(mut pipe) => {
if self.tasks.is_empty() {
match pipe.poll_io(&mut self.stream) {
@ -415,15 +412,16 @@ where
});
continue 'outer;
}
Err(req) => req,
Err(msg) => msg,
}
}
// handler is not found
self.tasks.push_back(Entry {
pipe: EntryPipe::Error(
Pipeline::error(HttpResponse::NotFound()),
),
pipe: EntryPipe::Error(ServerError::err(
Version::HTTP_11,
StatusCode::NOT_FOUND,
)),
flags: EntryFlags::empty(),
});
}
@ -475,12 +473,11 @@ mod tests {
use application::HttpApplication;
use httpmessage::HttpMessage;
use server::h1decoder::Message;
use server::helpers::SharedHttpInnerMessage;
use server::settings::WorkerSettings;
use server::KeepAlive;
use server::settings::{ServerSettings, WorkerSettings};
use server::{KeepAlive, Request};
impl Message {
fn message(self) -> SharedHttpInnerMessage {
fn message(self) -> Request {
match self {
Message::Message { msg, payload: _ } => msg,
_ => panic!("error"),
@ -509,9 +506,9 @@ mod tests {
macro_rules! parse_ready {
($e:expr) => {{
let settings: WorkerSettings<HttpApplication> =
WorkerSettings::new(Vec::new(), KeepAlive::Os);
WorkerSettings::new(Vec::new(), KeepAlive::Os, ServerSettings::default());
match H1Decoder::new().decode($e, &settings) {
Ok(Some(msg)) => HttpRequest::from_message(msg.message()),
Ok(Some(msg)) => msg.message(),
Ok(_) => unreachable!("Eof during parsing http request"),
Err(err) => unreachable!("Error during parsing http request: {:?}", err),
}
@ -521,7 +518,7 @@ mod tests {
macro_rules! expect_parse_err {
($e:expr) => {{
let settings: WorkerSettings<HttpApplication> =
WorkerSettings::new(Vec::new(), KeepAlive::Os);
WorkerSettings::new(Vec::new(), KeepAlive::Os, ServerSettings::default());
match H1Decoder::new().decode($e, &settings) {
Err(err) => match err {
@ -605,6 +602,7 @@ mod tests {
let settings = Rc::new(WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
));
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
@ -620,6 +618,7 @@ mod tests {
let settings = Rc::new(WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
));
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf);
@ -631,12 +630,16 @@ mod tests {
#[test]
fn test_parse() {
let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n\r\n");
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
match reader.decode(&mut buf, &settings) {
Ok(Some(msg)) => {
let req = HttpRequest::from_message(msg.message());
let req = msg.message();
assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test");
@ -648,7 +651,11 @@ mod tests {
#[test]
fn test_parse_partial() {
let mut buf = BytesMut::from("PUT /test HTTP/1");
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
match reader.decode(&mut buf, &settings) {
@ -659,7 +666,7 @@ mod tests {
buf.extend(b".1\r\n\r\n");
match reader.decode(&mut buf, &settings) {
Ok(Some(msg)) => {
let mut req = HttpRequest::from_message(msg.message());
let mut req = msg.message();
assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::PUT);
assert_eq!(req.path(), "/test");
@ -671,12 +678,16 @@ mod tests {
#[test]
fn test_parse_post() {
let mut buf = BytesMut::from("POST /test2 HTTP/1.0\r\n\r\n");
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
match reader.decode(&mut buf, &settings) {
Ok(Some(msg)) => {
let mut req = HttpRequest::from_message(msg.message());
let mut req = msg.message();
assert_eq!(req.version(), Version::HTTP_10);
assert_eq!(*req.method(), Method::POST);
assert_eq!(req.path(), "/test2");
@ -689,12 +700,16 @@ mod tests {
fn test_parse_body() {
let mut buf =
BytesMut::from("GET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody");
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
match reader.decode(&mut buf, &settings) {
Ok(Some(msg)) => {
let mut req = HttpRequest::from_message(msg.message());
let mut req = msg.message();
assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test");
@ -716,12 +731,16 @@ mod tests {
fn test_parse_body_crlf() {
let mut buf =
BytesMut::from("\r\nGET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody");
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
match reader.decode(&mut buf, &settings) {
Ok(Some(msg)) => {
let mut req = HttpRequest::from_message(msg.message());
let mut req = msg.message();
assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test");
@ -742,14 +761,18 @@ mod tests {
#[test]
fn test_parse_partial_eof() {
let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n");
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
assert!(reader.decode(&mut buf, &settings).unwrap().is_none());
buf.extend(b"\r\n");
match reader.decode(&mut buf, &settings) {
Ok(Some(msg)) => {
let req = HttpRequest::from_message(msg.message());
let req = msg.message();
assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test");
@ -761,7 +784,11 @@ mod tests {
#[test]
fn test_headers_split_field() {
let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n");
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
assert!{ reader.decode(&mut buf, &settings).unwrap().is_none() }
@ -775,7 +802,7 @@ mod tests {
buf.extend(b"t: value\r\n\r\n");
match reader.decode(&mut buf, &settings) {
Ok(Some(msg)) => {
let req = HttpRequest::from_message(msg.message());
let req = msg.message();
assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test");
@ -792,10 +819,14 @@ mod tests {
Set-Cookie: c1=cookie1\r\n\
Set-Cookie: c2=cookie2\r\n\r\n",
);
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
let msg = reader.decode(&mut buf, &settings).unwrap().unwrap();
let req = HttpRequest::from_message(msg.message());
let req = msg.message();
let val: Vec<_> = req
.headers()
@ -988,7 +1019,11 @@ mod tests {
#[test]
fn test_http_request_upgrade() {
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut buf = BytesMut::from(
"GET /test HTTP/1.1\r\n\
connection: upgrade\r\n\
@ -998,7 +1033,7 @@ mod tests {
let mut reader = H1Decoder::new();
let msg = reader.decode(&mut buf, &settings).unwrap().unwrap();
assert!(msg.is_payload());
let req = HttpRequest::from_message(msg.message());
let req = msg.message();
assert!(!req.keep_alive());
assert!(req.upgrade());
assert_eq!(
@ -1054,12 +1089,16 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n",
);
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
let msg = reader.decode(&mut buf, &settings).unwrap().unwrap();
assert!(msg.is_payload());
let req = HttpRequest::from_message(msg.message());
let req = msg.message();
assert!(req.chunked().unwrap());
buf.extend(b"4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n");
@ -1090,11 +1129,15 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n",
);
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
let msg = reader.decode(&mut buf, &settings).unwrap().unwrap();
assert!(msg.is_payload());
let req = HttpRequest::from_message(msg.message());
let req = msg.message();
assert!(req.chunked().unwrap());
buf.extend(
@ -1112,7 +1155,7 @@ mod tests {
let msg = reader.decode(&mut buf, &settings).unwrap().unwrap();
assert!(msg.is_payload());
let req2 = HttpRequest::from_message(msg.message());
let req2 = msg.message();
assert!(req2.chunked().unwrap());
assert_eq!(*req2.method(), Method::POST);
assert!(req2.chunked().unwrap());
@ -1124,12 +1167,16 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n",
);
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
let msg = reader.decode(&mut buf, &settings).unwrap().unwrap();
assert!(msg.is_payload());
let req = HttpRequest::from_message(msg.message());
let req = msg.message();
assert!(req.chunked().unwrap());
buf.extend(b"4\r\n1111\r\n");
@ -1171,13 +1218,16 @@ mod tests {
&"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n"[..],
);
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut reader = H1Decoder::new();
let msg = reader.decode(&mut buf, &settings).unwrap().unwrap();
assert!(msg.is_payload());
let req = HttpRequest::from_message(msg.message());
assert!(req.chunked().unwrap());
assert!(msg.message().chunked().unwrap());
buf.extend(b"4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n")
let chunk = reader.decode(&mut buf, &settings).unwrap().unwrap().chunk();

View File

@ -4,12 +4,11 @@ use bytes::{Bytes, BytesMut};
use futures::{Async, Poll};
use httparse;
use super::helpers::SharedHttpInnerMessage;
use super::message::{MessageFlags, Request};
use super::settings::WorkerSettings;
use error::ParseError;
use http::header::{HeaderName, HeaderValue};
use http::{header, HttpTryFrom, Method, Uri, Version};
use httprequest::MessageFlags;
use uri::Url;
const MAX_BUFFER_SIZE: usize = 131_072;
@ -20,10 +19,7 @@ pub(crate) struct H1Decoder {
}
pub(crate) enum Message {
Message {
msg: SharedHttpInnerMessage,
payload: bool,
},
Message { msg: Request, payload: bool },
Chunk(Bytes),
Eof,
}
@ -84,7 +80,7 @@ impl H1Decoder {
fn parse_message<H>(
&self, buf: &mut BytesMut, settings: &WorkerSettings<H>,
) -> Poll<(SharedHttpInnerMessage, Option<EncodingDecoder>), ParseError> {
) -> Poll<(Request, Option<EncodingDecoder>), ParseError> {
// Parse http message
let mut has_upgrade = false;
let mut chunked = false;
@ -122,11 +118,12 @@ impl H1Decoder {
let slice = buf.split_to(len).freeze();
// convert headers
let mut msg = settings.get_http_message();
let mut msg = settings.get_request_context();
{
let msg_mut = msg.get_mut();
msg_mut
let inner = &mut msg.inner;
inner
.flags
.get_mut()
.set(MessageFlags::KEEPALIVE, version != Version::HTTP_10);
for idx in headers[..headers_len].iter() {
@ -177,20 +174,20 @@ impl H1Decoder {
} else {
false
};
msg_mut.flags.set(MessageFlags::KEEPALIVE, ka);
inner.flags.get_mut().set(MessageFlags::KEEPALIVE, ka);
}
_ => (),
}
msg_mut.headers.append(name, value);
inner.headers.append(name, value);
} else {
return Err(ParseError::Header);
}
}
msg_mut.url = path;
msg_mut.method = method;
msg_mut.version = version;
inner.url = path;
inner.method = method;
inner.version = version;
}
msg
};
@ -202,7 +199,7 @@ impl H1Decoder {
} else if let Some(len) = content_length {
// Content-Length
Some(EncodingDecoder::length(len))
} else if has_upgrade || msg.get_ref().method == Method::CONNECT {
} else if has_upgrade || msg.inner.method == Method::CONNECT {
// upgrade(websocket) or connect
Some(EncodingDecoder::eof())
} else {

View File

@ -7,14 +7,16 @@ use std::rc::Rc;
use tokio_io::AsyncWrite;
use super::helpers;
use super::output::Output;
use super::output::{Output, ResponseInfo, ResponseLength};
use super::settings::WorkerSettings;
use super::Request;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body};
use header::ContentEncoding;
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
use http::header::{
HeaderValue, CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, DATE, TRANSFER_ENCODING,
};
use http::{Method, Version};
use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@ -101,8 +103,8 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
}
#[inline]
fn set_date(&self, dst: &mut BytesMut) {
self.settings.set_date(dst, true)
fn set_date(&mut self) {
self.settings.set_date(self.buffer.as_mut(), true)
}
#[inline]
@ -111,11 +113,11 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
}
fn start(
&mut self, req: &mut HttpInnerMessage, msg: &mut HttpResponse,
encoding: ContentEncoding,
&mut self, req: &Request, msg: &mut HttpResponse, encoding: ContentEncoding,
) -> io::Result<WriterState> {
// prepare task
self.buffer.for_server(req, msg, encoding);
let mut info = ResponseInfo::new(req.inner.method == Method::HEAD);
self.buffer.for_server(&mut info, &req.inner, msg, encoding);
if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) {
self.flags = Flags::STARTED | Flags::KEEPALIVE;
} else {
@ -123,7 +125,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
}
// Connection upgrade
let version = msg.version().unwrap_or_else(|| req.version);
let version = msg.version().unwrap_or_else(|| req.inner.version);
if msg.upgrade() {
self.flags.insert(Flags::UPGRADE);
msg.headers_mut()
@ -166,16 +168,29 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
helpers::write_status_line(version, msg.status().as_u16(), &mut buffer);
buffer.extend_from_slice(reason);
match body {
Body::Empty => if req.method != Method::HEAD {
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n");
} else {
buffer.extend_from_slice(b"\r\n");
},
Body::Binary(ref bytes) => {
helpers::write_content_length(bytes.len(), &mut buffer)
// content length
match info.length {
ResponseLength::Chunked => {
buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n")
}
_ => buffer.extend_from_slice(b"\r\n"),
ResponseLength::Zero => {
buffer.extend_from_slice(b"\r\ncontent-length: 0\r\n")
}
ResponseLength::Length(len) => {
helpers::write_content_length(len, &mut buffer)
}
ResponseLength::Length64(len) => {
let s = format!("{}", len);
buffer.extend_from_slice(b"\r\ncontent-length: ");
buffer.extend_from_slice(s.as_ref());
buffer.extend_from_slice(b"\r\n");
}
ResponseLength::None => buffer.extend_from_slice(b"\r\n"),
}
if let Some(ce) = info.content_encoding {
buffer.extend_from_slice(b"content-encoding: ");
buffer.extend_from_slice(ce.as_ref());
buffer.extend_from_slice(b"\r\n");
}
// write headers
@ -185,9 +200,13 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
unsafe {
let mut buf = &mut *(buffer.bytes_mut() as *mut [u8]);
for (key, value) in msg.headers() {
if is_bin && key == CONTENT_LENGTH {
is_bin = false;
continue;
match *key {
TRANSFER_ENCODING | CONTENT_ENCODING => continue,
CONTENT_LENGTH => match info.length {
ResponseLength::None => (),
_ => continue,
},
_ => (),
}
has_date = has_date || key == DATE;
let v = value.as_ref();

View File

@ -14,6 +14,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use error::{Error, PayloadError};
use http::{StatusCode, Version};
use httpmessage::HttpMessage;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
@ -21,6 +22,7 @@ use payload::{Payload, PayloadStatus, PayloadWriter};
use pipeline::Pipeline;
use uri::Url;
use super::error::ServerError;
use super::h2writer::H2Writer;
use super::input::PayloadType;
use super::settings::WorkerSettings;
@ -331,34 +333,35 @@ impl<H: HttpHandler + 'static> Entry<H> {
// Payload and Content-Encoding
let (psender, payload) = Payload::new(false);
let mut msg = settings.get_http_message();
msg.get_mut().url = Url::new(parts.uri);
msg.get_mut().method = parts.method;
msg.get_mut().version = parts.version;
msg.get_mut().headers = parts.headers;
msg.get_mut().payload = Some(payload);
msg.get_mut().addr = addr;
let mut req = HttpRequest::from_message(msg);
let mut msg = settings.get_request_context();
msg.inner.url = Url::new(parts.uri);
msg.inner.method = parts.method;
msg.inner.version = parts.version;
msg.inner.headers = parts.headers;
*msg.inner.payload.borrow_mut() = Some(payload);
msg.inner.addr = addr;
// Payload sender
let psender = PayloadType::new(req.headers(), psender);
let psender = PayloadType::new(msg.headers(), psender);
// start request processing
let mut task = None;
for h in settings.handlers().iter_mut() {
req = match h.handle(req) {
msg = match h.handle(msg) {
Ok(t) => {
task = Some(t);
break;
}
Err(req) => req,
Err(msg) => msg,
}
}
Entry {
task: task.map(EntryPipe::Task).unwrap_or_else(|| {
EntryPipe::Error(Pipeline::error(HttpResponse::NotFound()))
EntryPipe::Error(ServerError::err(
Version::HTTP_2,
StatusCode::NOT_FOUND,
))
}),
payload: psender,
stream: H2Writer::new(resp, Rc::clone(settings)),

View File

@ -9,15 +9,15 @@ use std::rc::Rc;
use std::{cmp, io};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
use http::{HttpTryFrom, Version};
use http::{HttpTryFrom, Method, Version};
use super::helpers;
use super::output::Output;
use super::message::Request;
use super::output::{Output, ResponseInfo};
use super::settings::WorkerSettings;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body};
use header::ContentEncoding;
use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse;
const CHUNK_SIZE: usize = 16_384;
@ -75,8 +75,8 @@ impl<H: 'static> Writer for H2Writer<H> {
}
#[inline]
fn set_date(&self, dst: &mut BytesMut) {
self.settings.set_date(dst, true)
fn set_date(&mut self) {
self.settings.set_date(self.buffer.as_mut(), true)
}
#[inline]
@ -85,12 +85,12 @@ impl<H: 'static> Writer for H2Writer<H> {
}
fn start(
&mut self, req: &mut HttpInnerMessage, msg: &mut HttpResponse,
encoding: ContentEncoding,
&mut self, req: &Request, msg: &mut HttpResponse, encoding: ContentEncoding,
) -> io::Result<WriterState> {
// prepare response
self.flags.insert(Flags::STARTED);
self.buffer.for_server(req, msg, encoding);
let mut info = ResponseInfo::new(req.inner.method == Method::HEAD);
self.buffer.for_server(&mut info, &req.inner, msg, encoding);
// http2 specific
msg.headers_mut().remove(CONNECTION);

View File

@ -1,91 +1,7 @@
use bytes::{BufMut, BytesMut};
use http::Version;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
use std::{mem, ptr, slice};
use httprequest::HttpInnerMessage;
/// Internal use only!
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpInnerMessage>>>);
impl SharedMessagePool {
pub fn new() -> SharedMessagePool {
SharedMessagePool(RefCell::new(VecDeque::with_capacity(128)))
}
#[inline]
pub fn get(&self) -> Rc<HttpInnerMessage> {
if let Some(msg) = self.0.borrow_mut().pop_front() {
msg
} else {
Rc::new(HttpInnerMessage::default())
}
}
#[inline]
pub fn release(&self, mut msg: Rc<HttpInnerMessage>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut msg).unwrap().reset();
v.push_front(msg);
}
}
}
pub(crate) struct SharedHttpInnerMessage(
Option<Rc<HttpInnerMessage>>,
Option<Rc<SharedMessagePool>>,
);
impl Drop for SharedHttpInnerMessage {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(msg) = self.0.take() {
if Rc::strong_count(&msg) == 1 {
pool.release(msg);
}
}
}
}
}
impl Clone for SharedHttpInnerMessage {
fn clone(&self) -> SharedHttpInnerMessage {
SharedHttpInnerMessage(self.0.clone(), self.1.clone())
}
}
impl Default for SharedHttpInnerMessage {
fn default() -> SharedHttpInnerMessage {
SharedHttpInnerMessage(Some(Rc::new(HttpInnerMessage::default())), None)
}
}
impl SharedHttpInnerMessage {
pub fn from_message(msg: HttpInnerMessage) -> SharedHttpInnerMessage {
SharedHttpInnerMessage(Some(Rc::new(msg)), None)
}
pub fn new(
msg: Rc<HttpInnerMessage>, pool: Rc<SharedMessagePool>,
) -> SharedHttpInnerMessage {
SharedHttpInnerMessage(Some(msg), Some(pool))
}
#[inline]
pub fn get_mut(&mut self) -> &mut HttpInnerMessage {
let r: &HttpInnerMessage = self.0.as_ref().unwrap().as_ref();
unsafe { &mut *(r as *const _ as *mut _) }
}
#[inline]
pub fn get_ref(&self) -> &HttpInnerMessage {
self.0.as_ref().unwrap()
}
}
const DEC_DIGITS_LUT: &[u8] = b"0001020304050607080910111213141516171819\
2021222324252627282930313233343536373839\
4041424344454647484950515253545556575859\

220
src/server/message.rs Normal file
View File

@ -0,0 +1,220 @@
use std::cell::{Cell, Ref, RefCell, RefMut};
use std::collections::VecDeque;
use std::net::SocketAddr;
use http::{header, HeaderMap, Method, Uri, Version};
use extensions::Extensions;
use httpmessage::HttpMessage;
use info::ConnectionInfo;
use payload::Payload;
use server::ServerSettings;
use uri::Url as InnerUrl;
bitflags! {
pub(crate) struct MessageFlags: u8 {
const KEEPALIVE = 0b0000_0001;
const CONN_INFO = 0b0000_0010;
}
}
/// Request's context
pub struct Request {
pub(crate) inner: Box<InnerRequest>,
}
pub(crate) struct InnerRequest {
pub(crate) version: Version,
pub(crate) method: Method,
pub(crate) url: InnerUrl,
pub(crate) flags: Cell<MessageFlags>,
pub(crate) headers: HeaderMap,
pub(crate) extensions: RefCell<Extensions>,
pub(crate) addr: Option<SocketAddr>,
pub(crate) info: RefCell<ConnectionInfo>,
pub(crate) payload: RefCell<Option<Payload>>,
pub(crate) settings: ServerSettings,
}
impl HttpMessage for Request {
type Stream = Payload;
fn headers(&self) -> &HeaderMap {
&self.inner.headers
}
#[inline]
fn payload(&self) -> Payload {
if let Some(payload) = self.inner.payload.borrow_mut().take() {
payload
} else {
Payload::empty()
}
}
}
impl Request {
/// Create new RequestContext instance
pub fn new(settings: ServerSettings) -> Request {
Request {
inner: Box::new(InnerRequest {
settings,
method: Method::GET,
url: InnerUrl::default(),
version: Version::HTTP_11,
headers: HeaderMap::with_capacity(16),
flags: Cell::new(MessageFlags::empty()),
addr: None,
info: RefCell::new(ConnectionInfo::default()),
payload: RefCell::new(None),
extensions: RefCell::new(Extensions::new()),
}),
}
}
#[inline]
pub(crate) fn url(&self) -> &InnerUrl {
&self.inner.url
}
/// Read the Request Uri.
#[inline]
pub fn uri(&self) -> &Uri {
self.inner.url.uri()
}
/// Read the Request method.
#[inline]
pub fn method(&self) -> &Method {
&self.inner.method
}
/// Read the Request Version.
#[inline]
pub fn version(&self) -> Version {
self.inner.version
}
/// The target path of this Request.
#[inline]
pub fn path(&self) -> &str {
self.inner.url.path()
}
#[inline]
/// Returns Request's headers.
pub fn headers(&self) -> &HeaderMap {
&self.inner.headers
}
#[inline]
/// Returns mutable Request's headers.
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.inner.headers
}
/// Peer socket address
///
/// Peer address is actual socket address, if proxy is used in front of
/// actix http server, then peer address would be address of this proxy.
///
/// To get client connection information `connection_info()` method should
/// be used.
pub fn peer_addr(&self) -> Option<SocketAddr> {
self.inner.addr
}
/// Checks if a connection should be kept alive.
#[inline]
pub fn keep_alive(&self) -> bool {
self.inner.flags.get().contains(MessageFlags::KEEPALIVE)
}
/// Request extensions
#[inline]
pub fn extensions(&self) -> Ref<Extensions> {
self.inner.extensions.borrow()
}
/// Mutable reference to a the request's extensions
#[inline]
pub fn extensions_mut(&self) -> RefMut<Extensions> {
self.inner.extensions.borrow_mut()
}
/// Check if request requires connection upgrade
pub fn upgrade(&self) -> bool {
if let Some(conn) = self.inner.headers.get(header::CONNECTION) {
if let Ok(s) = conn.to_str() {
return s.to_lowercase().contains("upgrade");
}
}
self.inner.method == Method::CONNECT
}
/// Get *ConnectionInfo* for the correct request.
pub fn connection_info(&self) -> Ref<ConnectionInfo> {
if self.inner.flags.get().contains(MessageFlags::CONN_INFO) {
self.inner.info.borrow()
} else {
let mut flags = self.inner.flags.get();
flags.insert(MessageFlags::CONN_INFO);
self.inner.flags.set(flags);
self.inner.info.borrow_mut().update(self);
self.inner.info.borrow()
}
}
/// Server settings
#[inline]
pub fn server_settings(&self) -> &ServerSettings {
&self.inner.settings
}
#[inline]
pub(crate) fn reset(&mut self) {
self.inner.headers.clear();
self.inner.extensions.borrow_mut().clear();
self.inner.flags.set(MessageFlags::empty());
*self.inner.payload.borrow_mut() = None;
}
}
pub(crate) struct RequestPool(RefCell<VecDeque<Request>>, RefCell<ServerSettings>);
thread_local!(static POOL: &'static RequestPool = RequestPool::create());
impl RequestPool {
fn create() -> &'static RequestPool {
let pool = RequestPool(
RefCell::new(VecDeque::with_capacity(128)),
RefCell::new(ServerSettings::default()),
);
Box::leak(Box::new(pool))
}
pub fn pool(settings: ServerSettings) -> &'static RequestPool {
POOL.with(|p| {
*p.1.borrow_mut() = settings;
*p
})
}
#[inline]
pub fn get(&self) -> Request {
if let Some(msg) = self.0.borrow_mut().pop_front() {
msg
} else {
Request::new(self.1.borrow().clone())
}
}
#[inline]
pub fn release(&self, mut msg: Request) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
msg.reset();
v.push_front(msg);
}
}
}

View File

@ -8,6 +8,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::TcpStream;
mod channel;
mod error;
pub(crate) mod h1;
pub(crate) mod h1decoder;
mod h1writer;
@ -15,11 +16,13 @@ mod h2;
mod h2writer;
pub(crate) mod helpers;
pub(crate) mod input;
pub(crate) mod message;
pub(crate) mod output;
pub(crate) mod settings;
mod srv;
mod worker;
pub use self::message::Request;
pub use self::settings::ServerSettings;
pub use self::srv::HttpServer;
@ -30,7 +33,7 @@ use actix::Message;
use body::Binary;
use error::Error;
use header::ContentEncoding;
use httprequest::{HttpInnerMessage, HttpRequest};
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
/// max buffer size 64k
@ -128,13 +131,13 @@ pub trait HttpHandler: 'static {
type Task: HttpHandlerTask;
/// Handle request
fn handle(&self, req: HttpRequest) -> Result<Self::Task, HttpRequest>;
fn handle(&self, req: Request) -> Result<Self::Task, Request>;
}
impl HttpHandler for Box<HttpHandler<Task = Box<HttpHandlerTask>>> {
type Task = Box<HttpHandlerTask>;
fn handle(&self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> {
fn handle(&self, req: Request) -> Result<Box<HttpHandlerTask>, Request> {
self.as_ref().handle(req)
}
}
@ -165,13 +168,13 @@ pub trait IntoHttpHandler {
type Handler: HttpHandler;
/// Convert into `HttpHandler` object.
fn into_handler(self, settings: ServerSettings) -> Self::Handler;
fn into_handler(self) -> Self::Handler;
}
impl<T: HttpHandler> IntoHttpHandler for T {
type Handler = T;
fn into_handler(self, _: ServerSettings) -> Self::Handler {
fn into_handler(self) -> Self::Handler {
self
}
}
@ -190,14 +193,13 @@ pub trait Writer {
fn written(&self) -> u64;
#[doc(hidden)]
fn set_date(&self, st: &mut BytesMut);
fn set_date(&mut self);
#[doc(hidden)]
fn buffer(&mut self) -> &mut BytesMut;
fn start(
&mut self, req: &mut HttpInnerMessage, resp: &mut HttpResponse,
encoding: ContentEncoding,
&mut self, req: &Request, resp: &mut HttpResponse, encoding: ContentEncoding,
) -> io::Result<WriterState>;
fn write(&mut self, payload: &Binary) -> io::Result<WriterState>;

View File

@ -15,11 +15,37 @@ use http::header::{
};
use http::{HttpTryFrom, Method, Version};
use super::message::{InnerRequest, Request};
use body::{Binary, Body};
use header::ContentEncoding;
use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse;
#[derive(Debug)]
pub(crate) enum ResponseLength {
Chunked,
Zero,
Length(usize),
Length64(u64),
None,
}
#[derive(Debug)]
pub(crate) struct ResponseInfo {
head: bool,
pub length: ResponseLength,
pub content_encoding: Option<&'static str>,
}
impl ResponseInfo {
pub fn new(head: bool) -> Self {
ResponseInfo {
head,
length: ResponseLength::None,
content_encoding: None,
}
}
}
#[derive(Debug)]
pub(crate) enum Output {
Empty(BytesMut),
@ -119,13 +145,12 @@ impl Output {
}
}
pub fn for_server(
&mut self, req: &HttpInnerMessage, resp: &mut HttpResponse,
pub(crate) fn for_server(
&mut self, info: &mut ResponseInfo, req: &InnerRequest, resp: &mut HttpResponse,
response_encoding: ContentEncoding,
) {
let buf = self.take();
let version = resp.version().unwrap_or_else(|| req.version);
let is_head = req.method == Method::HEAD;
let mut len = 0;
#[cfg_attr(feature = "cargo-clippy", allow(match_ref_pats))]
@ -158,10 +183,7 @@ impl Output {
encoding => encoding,
};
if encoding.is_compression() {
resp.headers_mut().insert(
CONTENT_ENCODING,
HeaderValue::from_static(encoding.as_str()),
);
info.content_encoding = Some(encoding.as_str());
}
encoding
} else {
@ -173,8 +195,8 @@ impl Output {
#[cfg_attr(feature = "cargo-clippy", allow(match_ref_pats))]
let transfer = match resp.body() {
&Body::Empty => {
if req.method != Method::HEAD {
resp.headers_mut().remove(CONTENT_LENGTH);
if !info.head {
info.length = ResponseLength::Zero;
}
*self = Output::Empty(buf);
return;
@ -216,13 +238,8 @@ impl Output {
}
}
if is_head {
let mut b = BytesMut::new();
let _ = write!(b, "{}", len);
resp.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::try_from(b.freeze()).unwrap(),
);
info.length = ResponseLength::Length(len);
if info.head {
*self = Output::Empty(buf);
} else {
*self = Output::Buffer(buf);
@ -236,7 +253,7 @@ impl Output {
}
if encoding != ContentEncoding::Identity {
encoding = ContentEncoding::Identity;
resp.headers_mut().remove(CONTENT_ENCODING);
info.content_encoding.take();
}
TransferEncoding::eof(buf)
} else {
@ -245,12 +262,12 @@ impl Output {
{
resp.headers_mut().remove(CONTENT_LENGTH);
}
Output::streaming_encoding(buf, version, resp)
Output::streaming_encoding(info, buf, version, resp)
}
}
};
// check for head response
if is_head {
if info.head {
resp.set_body(Body::Empty);
*self = Output::Empty(transfer.buf.unwrap());
return;
@ -277,18 +294,17 @@ impl Output {
}
fn streaming_encoding(
buf: BytesMut, version: Version, resp: &mut HttpResponse,
info: &mut ResponseInfo, buf: BytesMut, version: Version,
resp: &mut HttpResponse,
) -> TransferEncoding {
match resp.chunked() {
Some(true) => {
// Enable transfer encoding
resp.headers_mut().remove(CONTENT_LENGTH);
if version == Version::HTTP_2 {
resp.headers_mut().remove(TRANSFER_ENCODING);
info.length = ResponseLength::None;
TransferEncoding::eof(buf)
} else {
resp.headers_mut()
.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
info.length = ResponseLength::Chunked;
TransferEncoding::chunked(buf)
}
}
@ -315,6 +331,7 @@ impl Output {
if !chunked {
if let Some(len) = len {
info.length = ResponseLength::Length64(len);
TransferEncoding::length(len, buf)
} else {
TransferEncoding::eof(buf)
@ -323,14 +340,11 @@ impl Output {
// Enable transfer encoding
match version {
Version::HTTP_11 => {
resp.headers_mut().insert(
TRANSFER_ENCODING,
HeaderValue::from_static("chunked"),
);
info.length = ResponseLength::Chunked;
TransferEncoding::chunked(buf)
}
_ => {
resp.headers_mut().remove(TRANSFER_ENCODING);
info.length = ResponseLength::None;
TransferEncoding::eof(buf)
}
}

View File

@ -12,6 +12,7 @@ use time;
use super::channel::Node;
use super::helpers;
use super::message::{Request, RequestPool};
use super::KeepAlive;
use body::Body;
use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool};
@ -156,14 +157,17 @@ pub(crate) struct WorkerSettings<H> {
keep_alive: u64,
ka_enabled: bool,
bytes: Rc<SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
messages: &'static RequestPool,
channels: Cell<usize>,
node: Box<Node<()>>,
date: UnsafeCell<Date>,
settings: ServerSettings,
}
impl<H> WorkerSettings<H> {
pub(crate) fn new(h: Vec<H>, keep_alive: KeepAlive) -> WorkerSettings<H> {
pub(crate) fn new(
h: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings,
) -> WorkerSettings<H> {
let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true),
KeepAlive::Os | KeepAlive::Tcp(_) => (0, true),
@ -171,14 +175,15 @@ impl<H> WorkerSettings<H> {
};
WorkerSettings {
keep_alive,
ka_enabled,
h: RefCell::new(h),
bytes: Rc::new(SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
messages: RequestPool::pool(settings.clone()),
channels: Cell::new(0),
node: Box::new(Node::head()),
date: UnsafeCell::new(Date::new()),
keep_alive,
ka_enabled,
settings,
}
}
@ -210,11 +215,8 @@ impl<H> WorkerSettings<H> {
self.bytes.release_bytes(bytes)
}
pub fn get_http_message(&self) -> helpers::SharedHttpInnerMessage {
helpers::SharedHttpInnerMessage::new(
self.messages.get(),
Rc::clone(&self.messages),
)
pub fn get_request_context(&self) -> Request {
self.messages.get()
}
pub fn add_channel(&self) {
@ -316,7 +318,11 @@ mod tests {
#[test]
fn test_date() {
let settings = WorkerSettings::<()>::new(Vec::new(), KeepAlive::Os);
let settings = WorkerSettings::<()>::new(
Vec::new(),
KeepAlive::Os,
ServerSettings::default(),
);
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf1, true);
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);

View File

@ -358,12 +358,10 @@ where
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let s = ServerSettings::from_parts(parts);
let apps: Vec<_> = (*factory)()
.into_iter()
.map(|h| h.into_handler(s.clone()))
.collect();
let apps: Vec<_> =
(*factory)().into_iter().map(|h| h.into_handler()).collect();
ctx.add_message_stream(rx);
Worker::new(apps, socks, ka)
Worker::new(apps, socks, ka, s)
});
workers.push((idx, tx));
self.workers.push((idx, addr));
@ -404,7 +402,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// fn main() {
/// let sys = actix::System::new("example"); // <- create Actix system
///
/// server::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok())))
/// server::new(|| App::new().resource("/", |r| r.h(|_: &_| HttpResponse::Ok())))
/// .bind("127.0.0.1:0")
/// .expect("Can not bind to 127.0.0.1:0")
/// .start();
@ -559,9 +557,13 @@ impl<H: IntoHttpHandler> HttpServer<H> {
let settings = ServerSettings::new(Some(addr), &self.host, secure);
let apps: Vec<_> = (*self.factory)()
.into_iter()
.map(|h| h.into_handler(settings.clone()))
.map(|h| h.into_handler())
.collect();
self.h = Some(Rc::new(WorkerSettings::new(apps, self.keep_alive)));
self.h = Some(Rc::new(WorkerSettings::new(
apps,
self.keep_alive,
settings,
)));
// start server
let signals = self.subscribe_to_signals();
@ -645,12 +647,10 @@ impl<H: IntoHttpHandler> StreamHandler2<ServerCommand, ()> for HttpServer<H> {
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let settings = ServerSettings::new(Some(addr), &host, false);
let apps: Vec<_> = (*factory)()
.into_iter()
.map(|h| h.into_handler(settings.clone()))
.collect();
let apps: Vec<_> =
(*factory)().into_iter().map(|h| h.into_handler()).collect();
ctx.add_message_stream(rx);
Worker::new(apps, socks, ka)
Worker::new(apps, socks, ka, settings)
});
for item in &self.accept {
let _ = item.1.send(Command::Worker(new_idx, tx.clone()));

View File

@ -25,7 +25,7 @@ use actix::msgs::StopArbiter;
use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message, Response};
use server::channel::HttpChannel;
use server::settings::WorkerSettings;
use server::settings::{ServerSettings, WorkerSettings};
use server::{HttpHandler, KeepAlive};
#[derive(Message)]
@ -68,6 +68,7 @@ where
impl<H: HttpHandler + 'static> Worker<H> {
pub(crate) fn new(
h: Vec<H>, socks: Slab<SocketInfo>, keep_alive: KeepAlive,
settings: ServerSettings,
) -> Worker<H> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
@ -76,7 +77,7 @@ impl<H: HttpHandler + 'static> Worker<H> {
};
Worker {
settings: Rc::new(WorkerSettings::new(h, keep_alive)),
settings: Rc::new(WorkerSettings::new(h, keep_alive, settings)),
socks,
tcp_ka,
}