mirror of
https://github.com/actix/actix-extras.git
synced 2025-01-22 23:05:56 +01:00
various cleanups
This commit is contained in:
parent
55204c829c
commit
d4187f682b
@ -91,7 +91,6 @@ serde_derive = "1.0"
|
|||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
skeptic = "0.13"
|
skeptic = "0.13"
|
||||||
serde_derive = "1.0"
|
|
||||||
version_check = "0.1"
|
version_check = "0.1"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
|
@ -25,7 +25,7 @@ pub(crate) trait IoContext: 'static {
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) enum Frame {
|
pub(crate) enum Frame {
|
||||||
Message(HttpResponse),
|
Message(Box<HttpResponse>),
|
||||||
Payload(Option<Binary>),
|
Payload(Option<Binary>),
|
||||||
Drain(Rc<RefCell<DrainFut>>),
|
Drain(Rc<RefCell<DrainFut>>),
|
||||||
}
|
}
|
||||||
@ -141,7 +141,7 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
|
|||||||
Body::StreamingContext | Body::UpgradeContext => self.streaming = true,
|
Body::StreamingContext | Body::UpgradeContext => self.streaming = true,
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
self.stream.push_back(Frame::Message(resp))
|
self.stream.push_back(Frame::Message(Box::new(resp)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write payload
|
/// Write payload
|
||||||
|
@ -125,9 +125,9 @@ impl PayloadWriter for PayloadType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
enum Decoder {
|
enum Decoder {
|
||||||
Deflate(DeflateDecoder<Writer<BytesMut>>),
|
Deflate(Box<DeflateDecoder<Writer<BytesMut>>>),
|
||||||
Gzip(Option<GzDecoder<Wrapper>>),
|
Gzip(Box<Option<GzDecoder<Wrapper>>>),
|
||||||
Br(BrotliDecoder<Writer<BytesMut>>),
|
Br(Box<BrotliDecoder<Writer<BytesMut>>>),
|
||||||
Identity,
|
Identity,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,10 +158,10 @@ impl EncodedPayload {
|
|||||||
pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload {
|
pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload {
|
||||||
let dec = match enc {
|
let dec = match enc {
|
||||||
ContentEncoding::Br => Decoder::Br(
|
ContentEncoding::Br => Decoder::Br(
|
||||||
BrotliDecoder::new(BytesMut::with_capacity(8192).writer())),
|
Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))),
|
||||||
ContentEncoding::Deflate => Decoder::Deflate(
|
ContentEncoding::Deflate => Decoder::Deflate(
|
||||||
DeflateDecoder::new(BytesMut::with_capacity(8192).writer())),
|
Box::new(DeflateDecoder::new(BytesMut::with_capacity(8192).writer()))),
|
||||||
ContentEncoding::Gzip => Decoder::Gzip(None),
|
ContentEncoding::Gzip => Decoder::Gzip(Box::new(None)),
|
||||||
_ => Decoder::Identity,
|
_ => Decoder::Identity,
|
||||||
};
|
};
|
||||||
EncodedPayload {
|
EncodedPayload {
|
||||||
@ -204,13 +204,13 @@ impl PayloadWriter for EncodedPayload {
|
|||||||
}
|
}
|
||||||
loop {
|
loop {
|
||||||
let len = self.dst.get_ref().len();
|
let len = self.dst.get_ref().len();
|
||||||
let len_buf = decoder.as_mut().unwrap().get_mut().buf.len();
|
let len_buf = decoder.as_mut().as_mut().unwrap().get_mut().buf.len();
|
||||||
|
|
||||||
if len < len_buf * 2 {
|
if len < len_buf * 2 {
|
||||||
self.dst.get_mut().reserve(len_buf * 2 - len);
|
self.dst.get_mut().reserve(len_buf * 2 - len);
|
||||||
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
|
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
|
||||||
}
|
}
|
||||||
match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) {
|
match decoder.as_mut().as_mut().unwrap().read(&mut self.dst.get_mut()) {
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
self.inner.feed_eof();
|
self.inner.feed_eof();
|
||||||
@ -271,13 +271,13 @@ impl PayloadWriter for EncodedPayload {
|
|||||||
if decoder.is_none() {
|
if decoder.is_none() {
|
||||||
let mut buf = BytesMut::new();
|
let mut buf = BytesMut::new();
|
||||||
buf.extend(data);
|
buf.extend(data);
|
||||||
*decoder = Some(GzDecoder::new(Wrapper{buf: buf}).unwrap());
|
*(decoder.as_mut()) = Some(GzDecoder::new(Wrapper{buf: buf}).unwrap());
|
||||||
} else {
|
} else {
|
||||||
decoder.as_mut().unwrap().get_mut().buf.extend(data);
|
decoder.as_mut().as_mut().unwrap().get_mut().buf.extend(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let len_buf = decoder.as_mut().unwrap().get_mut().buf.len();
|
let len_buf = decoder.as_mut().as_mut().unwrap().get_mut().buf.len();
|
||||||
if len_buf == 0 {
|
if len_buf == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -287,7 +287,7 @@ impl PayloadWriter for EncodedPayload {
|
|||||||
self.dst.get_mut().reserve(len_buf * 2 - len);
|
self.dst.get_mut().reserve(len_buf * 2 - len);
|
||||||
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
|
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
|
||||||
}
|
}
|
||||||
match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) {
|
match decoder.as_mut().as_mut().unwrap().read(&mut self.dst.get_mut()) {
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
return
|
return
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::fmt::Write;
|
|
||||||
use futures::{Async, Poll};
|
use futures::{Async, Poll};
|
||||||
use tokio_io::AsyncWrite;
|
use tokio_io::AsyncWrite;
|
||||||
use http::{Version, StatusCode};
|
use http::Version;
|
||||||
use http::header::{HeaderValue, CONNECTION, CONTENT_TYPE, DATE};
|
use http::header::{HeaderValue, CONNECTION, CONTENT_TYPE, DATE};
|
||||||
|
|
||||||
use date;
|
use date;
|
||||||
@ -151,11 +150,17 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
|||||||
buffer.reserve(100 + msg.headers().len() * AVERAGE_HEADER_SIZE);
|
buffer.reserve(100 + msg.headers().len() * AVERAGE_HEADER_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
if version == Version::HTTP_11 && msg.status() == StatusCode::OK {
|
match version {
|
||||||
buffer.extend(b"HTTP/1.1 200 OK\r\n");
|
Version::HTTP_11 => buffer.extend(b"HTTP/1.1 "),
|
||||||
} else {
|
Version::HTTP_2 => buffer.extend(b"HTTP/2.0 "),
|
||||||
let _ = write!(buffer, "{:?} {}\r\n", version, msg.status());
|
Version::HTTP_10 => buffer.extend(b"HTTP/1.0 "),
|
||||||
|
Version::HTTP_09 => buffer.extend(b"HTTP/0.9 "),
|
||||||
}
|
}
|
||||||
|
buffer.extend(msg.status().as_u16().to_string().as_bytes());
|
||||||
|
buffer.extend(b" ");
|
||||||
|
buffer.extend(msg.reason().as_bytes());
|
||||||
|
buffer.extend(b"\r\n");
|
||||||
|
|
||||||
for (key, value) in msg.headers() {
|
for (key, value) in msg.headers() {
|
||||||
let t: &[u8] = key.as_ref();
|
let t: &[u8] = key.as_ref();
|
||||||
buffer.extend(t);
|
buffer.extend(t);
|
||||||
|
@ -50,7 +50,7 @@ impl<F, R, S> Handler<S> for F
|
|||||||
pub struct Reply(ReplyItem);
|
pub struct Reply(ReplyItem);
|
||||||
|
|
||||||
pub(crate) enum ReplyItem {
|
pub(crate) enum ReplyItem {
|
||||||
Message(HttpResponse),
|
Message(Box<HttpResponse>),
|
||||||
Actor(Box<IoContext>),
|
Actor(Box<IoContext>),
|
||||||
Future(Box<Future<Item=HttpResponse, Error=Error>>),
|
Future(Box<Future<Item=HttpResponse, Error=Error>>),
|
||||||
}
|
}
|
||||||
@ -76,7 +76,7 @@ impl Reply {
|
|||||||
/// Send response
|
/// Send response
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn response<R: Into<HttpResponse>>(response: R) -> Reply {
|
pub fn response<R: Into<HttpResponse>>(response: R) -> Reply {
|
||||||
Reply(ReplyItem::Message(response.into()))
|
Reply(ReplyItem::Message(Box::new(response.into())))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -107,14 +107,14 @@ impl FromRequest for HttpResponse {
|
|||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
fn from_request(self, _: HttpRequest) -> Result<Reply, Error> {
|
fn from_request(self, _: HttpRequest) -> Result<Reply, Error> {
|
||||||
Ok(Reply(ReplyItem::Message(self)))
|
Ok(Reply(ReplyItem::Message(Box::new(self))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<HttpResponse> for Reply {
|
impl From<HttpResponse> for Reply {
|
||||||
|
|
||||||
fn from(resp: HttpResponse) -> Reply {
|
fn from(resp: HttpResponse) -> Reply {
|
||||||
Reply(ReplyItem::Message(resp))
|
Reply(ReplyItem::Message(Box::new(resp)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -138,7 +138,7 @@ impl<E: Into<Error>> From<Result<Reply, E>> for Reply {
|
|||||||
fn from(res: Result<Reply, E>) -> Self {
|
fn from(res: Result<Reply, E>) -> Self {
|
||||||
match res {
|
match res {
|
||||||
Ok(val) => val,
|
Ok(val) => val,
|
||||||
Err(err) => Reply(ReplyItem::Message(err.into().into())),
|
Err(err) => Reply(ReplyItem::Message(Box::new(err.into().into()))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -166,7 +166,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_with_reason() {
|
fn test_with_reason() {
|
||||||
let resp = HTTPOk.response();
|
let resp = HTTPOk.response();
|
||||||
assert_eq!(resp.reason(), "");
|
assert_eq!(resp.reason(), "OK");
|
||||||
|
|
||||||
let resp = HTTPBadRequest.with_reason("test");
|
let resp = HTTPBadRequest.with_reason("test");
|
||||||
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
||||||
|
@ -119,7 +119,7 @@ impl HttpResponse {
|
|||||||
if let Some(reason) = self.reason {
|
if let Some(reason) = self.reason {
|
||||||
reason
|
reason
|
||||||
} else {
|
} else {
|
||||||
""
|
self.status.canonical_reason().unwrap_or("<unknown status code>")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ impl DefaultHeaders {
|
|||||||
|
|
||||||
impl<S> Middleware<S> for DefaultHeaders {
|
impl<S> Middleware<S> for DefaultHeaders {
|
||||||
|
|
||||||
fn response(&self, _: &mut HttpRequest<S>, mut resp: HttpResponse) -> Response {
|
fn response(&self, _: &mut HttpRequest<S>, mut resp: Box<HttpResponse>) -> Response {
|
||||||
for (key, value) in self.0.iter() {
|
for (key, value) in self.0.iter() {
|
||||||
if !resp.headers().contains_key(key) {
|
if !resp.headers().contains_key(key) {
|
||||||
resp.headers_mut().insert(key, value.clone());
|
resp.headers_mut().insert(key, value.clone());
|
||||||
@ -97,14 +97,14 @@ mod tests {
|
|||||||
let mut req = HttpRequest::default();
|
let mut req = HttpRequest::default();
|
||||||
|
|
||||||
let resp = HttpResponse::Ok().finish().unwrap();
|
let resp = HttpResponse::Ok().finish().unwrap();
|
||||||
let resp = match mw.response(&mut req, resp) {
|
let resp = match mw.response(&mut req, Box::new(resp)) {
|
||||||
Response::Done(resp) => resp,
|
Response::Done(resp) => resp,
|
||||||
_ => panic!(),
|
_ => panic!(),
|
||||||
};
|
};
|
||||||
assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "0001");
|
assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "0001");
|
||||||
|
|
||||||
let resp = HttpResponse::Ok().header(CONTENT_TYPE, "0002").finish().unwrap();
|
let resp = HttpResponse::Ok().header(CONTENT_TYPE, "0002").finish().unwrap();
|
||||||
let resp = match mw.response(&mut req, resp) {
|
let resp = match mw.response(&mut req, Box::new(resp)) {
|
||||||
Response::Done(resp) => resp,
|
Response::Done(resp) => resp,
|
||||||
_ => panic!(),
|
_ => panic!(),
|
||||||
};
|
};
|
||||||
|
@ -21,7 +21,7 @@ pub enum Started {
|
|||||||
Err(Error),
|
Err(Error),
|
||||||
/// New http response got generated. If middleware generates response
|
/// New http response got generated. If middleware generates response
|
||||||
/// handler execution halts.
|
/// handler execution halts.
|
||||||
Response(HttpResponse),
|
Response(Box<HttpResponse>),
|
||||||
/// Execution completed, runs future to completion.
|
/// Execution completed, runs future to completion.
|
||||||
Future(Box<Future<Item=Option<HttpResponse>, Error=Error>>),
|
Future(Box<Future<Item=Option<HttpResponse>, Error=Error>>),
|
||||||
}
|
}
|
||||||
@ -31,7 +31,7 @@ pub enum Response {
|
|||||||
/// Moddleware error
|
/// Moddleware error
|
||||||
Err(Error),
|
Err(Error),
|
||||||
/// New http response got generated
|
/// New http response got generated
|
||||||
Done(HttpResponse),
|
Done(Box<HttpResponse>),
|
||||||
/// Result is a future that resolves to a new http response
|
/// Result is a future that resolves to a new http response
|
||||||
Future(Box<Future<Item=HttpResponse, Error=Error>>),
|
Future(Box<Future<Item=HttpResponse, Error=Error>>),
|
||||||
}
|
}
|
||||||
@ -56,7 +56,7 @@ pub trait Middleware<S> {
|
|||||||
|
|
||||||
/// Method is called when handler returns response,
|
/// Method is called when handler returns response,
|
||||||
/// but before sending http message to peer.
|
/// but before sending http message to peer.
|
||||||
fn response(&self, req: &mut HttpRequest<S>, resp: HttpResponse) -> Response {
|
fn response(&self, req: &mut HttpRequest<S>, resp: Box<HttpResponse>) -> Response {
|
||||||
Response::Done(resp)
|
Response::Done(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ impl<S: 'static, T: SessionBackend<S>> Middleware<S> for SessionStorage<T, S> {
|
|||||||
Started::Future(Box::new(fut))
|
Started::Future(Box::new(fut))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn response(&self, req: &mut HttpRequest<S>, resp: HttpResponse) -> Response {
|
fn response(&self, req: &mut HttpRequest<S>, resp: Box<HttpResponse>) -> Response {
|
||||||
if let Some(s_box) = req.extensions().remove::<Arc<SessionImplBox>>() {
|
if let Some(s_box) = req.extensions().remove::<Arc<SessionImplBox>>() {
|
||||||
s_box.0.write(resp)
|
s_box.0.write(resp)
|
||||||
} else {
|
} else {
|
||||||
@ -129,7 +129,7 @@ pub trait SessionImpl: 'static {
|
|||||||
fn clear(&mut self);
|
fn clear(&mut self);
|
||||||
|
|
||||||
/// Write session to storage backend.
|
/// Write session to storage backend.
|
||||||
fn write(&self, resp: HttpResponse) -> Response;
|
fn write(&self, resp: Box<HttpResponse>) -> Response;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Session's storage backend trait definition.
|
/// Session's storage backend trait definition.
|
||||||
@ -155,7 +155,7 @@ impl SessionImpl for DummySessionImpl {
|
|||||||
fn set(&mut self, key: &str, value: String) {}
|
fn set(&mut self, key: &str, value: String) {}
|
||||||
fn remove(&mut self, key: &str) {}
|
fn remove(&mut self, key: &str) {}
|
||||||
fn clear(&mut self) {}
|
fn clear(&mut self) {}
|
||||||
fn write(&self, resp: HttpResponse) -> Response {
|
fn write(&self, resp: Box<HttpResponse>) -> Response {
|
||||||
Response::Done(resp)
|
Response::Done(resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -205,7 +205,7 @@ impl SessionImpl for CookieSession {
|
|||||||
self.state.clear()
|
self.state.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write(&self, mut resp: HttpResponse) -> Response {
|
fn write(&self, mut resp: Box<HttpResponse>) -> Response {
|
||||||
if self.changed {
|
if self.changed {
|
||||||
let _ = self.inner.set_cookie(&mut resp, &self.state);
|
let _ = self.inner.set_cookie(&mut resp, &self.state);
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ pub trait FromParam: Sized {
|
|||||||
///
|
///
|
||||||
/// If resource path contains variable patterns, `Params` stores this variables.
|
/// If resource path contains variable patterns, `Params` stores this variables.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Params<'a>(SmallVec<[(&'a str, &'a str); 4]>);
|
pub struct Params<'a>(SmallVec<[(&'a str, &'a str); 3]>);
|
||||||
|
|
||||||
impl<'a> Default for Params<'a> {
|
impl<'a> Default for Params<'a> {
|
||||||
fn default() -> Params<'a> {
|
fn default() -> Params<'a> {
|
||||||
|
@ -141,7 +141,8 @@ impl<S> Pipeline<S> {
|
|||||||
impl Pipeline<()> {
|
impl Pipeline<()> {
|
||||||
pub fn error<R: Into<HttpResponse>>(err: R) -> Box<HttpHandlerTask> {
|
pub fn error<R: Into<HttpResponse>>(err: R) -> Box<HttpHandlerTask> {
|
||||||
Box::new(Pipeline(
|
Box::new(Pipeline(
|
||||||
PipelineInfo::new(HttpRequest::default()), ProcessResponse::init(err.into())))
|
PipelineInfo::new(
|
||||||
|
HttpRequest::default()), ProcessResponse::init(Box::new(err.into()))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -346,15 +347,15 @@ impl<S> StartMiddlewares<S> {
|
|||||||
fut: Some(fut)}),
|
fut: Some(fut)}),
|
||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
if let Some(resp) = resp {
|
if let Some(resp) = resp {
|
||||||
return RunMiddlewares::init(info, resp);
|
return RunMiddlewares::init(info, Box::new(resp));
|
||||||
}
|
}
|
||||||
info.count += 1;
|
info.count += 1;
|
||||||
}
|
}
|
||||||
Err(err) =>
|
Err(err) =>
|
||||||
return ProcessResponse::init(err.into()),
|
return ProcessResponse::init(Box::new(err.into())),
|
||||||
},
|
},
|
||||||
Started::Err(err) =>
|
Started::Err(err) =>
|
||||||
return ProcessResponse::init(err.into()),
|
return ProcessResponse::init(Box::new(err.into())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -369,7 +370,7 @@ impl<S> StartMiddlewares<S> {
|
|||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
info.count += 1;
|
info.count += 1;
|
||||||
if let Some(resp) = resp {
|
if let Some(resp) = resp {
|
||||||
return Ok(RunMiddlewares::init(info, resp));
|
return Ok(RunMiddlewares::init(info, Box::new(resp)));
|
||||||
}
|
}
|
||||||
if info.count == len {
|
if info.count == len {
|
||||||
let reply = (unsafe{&*self.hnd})(info.req.clone());
|
let reply = (unsafe{&*self.hnd})(info.req.clone());
|
||||||
@ -387,13 +388,13 @@ impl<S> StartMiddlewares<S> {
|
|||||||
continue 'outer
|
continue 'outer
|
||||||
},
|
},
|
||||||
Started::Err(err) =>
|
Started::Err(err) =>
|
||||||
return Ok(ProcessResponse::init(err.into()))
|
return Ok(ProcessResponse::init(Box::new(err.into())))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) =>
|
Err(err) =>
|
||||||
return Ok(ProcessResponse::init(err.into()))
|
return Ok(ProcessResponse::init(Box::new(err.into())))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -441,14 +442,14 @@ impl<S> WaitingResponse<S> {
|
|||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
error!("Unexpected eof");
|
error!("Unexpected eof");
|
||||||
let err: Error = UnexpectedTaskFrame.into();
|
let err: Error = UnexpectedTaskFrame.into();
|
||||||
return Ok(ProcessResponse::init(err.into()))
|
return Ok(ProcessResponse::init(Box::new(err.into())))
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
self.stream = PipelineResponse::Context(context);
|
self.stream = PipelineResponse::Context(context);
|
||||||
return Err(PipelineState::Handler(self))
|
return Err(PipelineState::Handler(self))
|
||||||
},
|
},
|
||||||
Err(err) =>
|
Err(err) =>
|
||||||
return Ok(ProcessResponse::init(err.into()))
|
return Ok(ProcessResponse::init(Box::new(err.into())))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -459,9 +460,9 @@ impl<S> WaitingResponse<S> {
|
|||||||
Err(PipelineState::Handler(self))
|
Err(PipelineState::Handler(self))
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(response)) =>
|
Ok(Async::Ready(response)) =>
|
||||||
Ok(RunMiddlewares::init(info, response)),
|
Ok(RunMiddlewares::init(info, Box::new(response))),
|
||||||
Err(err) =>
|
Err(err) =>
|
||||||
Ok(ProcessResponse::init(err.into())),
|
Ok(ProcessResponse::init(Box::new(err.into()))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PipelineResponse::None => {
|
PipelineResponse::None => {
|
||||||
@ -481,7 +482,7 @@ struct RunMiddlewares<S> {
|
|||||||
|
|
||||||
impl<S> RunMiddlewares<S> {
|
impl<S> RunMiddlewares<S> {
|
||||||
|
|
||||||
fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S>
|
fn init(info: &mut PipelineInfo<S>, mut resp: Box<HttpResponse>) -> PipelineState<S>
|
||||||
{
|
{
|
||||||
if info.count == 0 {
|
if info.count == 0 {
|
||||||
return ProcessResponse::init(resp);
|
return ProcessResponse::init(resp);
|
||||||
@ -493,7 +494,7 @@ impl<S> RunMiddlewares<S> {
|
|||||||
resp = match info.mws[curr].response(info.req_mut(), resp) {
|
resp = match info.mws[curr].response(info.req_mut(), resp) {
|
||||||
Response::Err(err) => {
|
Response::Err(err) => {
|
||||||
info.count = curr + 1;
|
info.count = curr + 1;
|
||||||
return ProcessResponse::init(err.into())
|
return ProcessResponse::init(Box::new(err.into()))
|
||||||
}
|
}
|
||||||
Response::Done(r) => {
|
Response::Done(r) => {
|
||||||
curr += 1;
|
curr += 1;
|
||||||
@ -521,10 +522,10 @@ impl<S> RunMiddlewares<S> {
|
|||||||
return Ok(PipelineState::RunMiddlewares(self)),
|
return Ok(PipelineState::RunMiddlewares(self)),
|
||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
self.curr += 1;
|
self.curr += 1;
|
||||||
resp
|
Box::new(resp)
|
||||||
}
|
}
|
||||||
Err(err) =>
|
Err(err) =>
|
||||||
return Ok(ProcessResponse::init(err.into())),
|
return Ok(ProcessResponse::init(Box::new(err.into()))),
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@ -533,7 +534,7 @@ impl<S> RunMiddlewares<S> {
|
|||||||
} else {
|
} else {
|
||||||
match info.mws[self.curr].response(info.req_mut(), resp) {
|
match info.mws[self.curr].response(info.req_mut(), resp) {
|
||||||
Response::Err(err) =>
|
Response::Err(err) =>
|
||||||
return Ok(ProcessResponse::init(err.into())),
|
return Ok(ProcessResponse::init(Box::new(err.into()))),
|
||||||
Response::Done(r) => {
|
Response::Done(r) => {
|
||||||
self.curr += 1;
|
self.curr += 1;
|
||||||
resp = r
|
resp = r
|
||||||
@ -550,7 +551,7 @@ impl<S> RunMiddlewares<S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct ProcessResponse<S> {
|
struct ProcessResponse<S> {
|
||||||
resp: HttpResponse,
|
resp: Box<HttpResponse>,
|
||||||
iostate: IOState,
|
iostate: IOState,
|
||||||
running: RunningState,
|
running: RunningState,
|
||||||
drain: DrainVec,
|
drain: DrainVec,
|
||||||
@ -596,6 +597,7 @@ impl IOState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct DrainVec(Vec<Rc<RefCell<DrainFut>>>);
|
struct DrainVec(Vec<Rc<RefCell<DrainFut>>>);
|
||||||
|
|
||||||
impl Drop for DrainVec {
|
impl Drop for DrainVec {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
for drain in &mut self.0 {
|
for drain in &mut self.0 {
|
||||||
@ -606,7 +608,7 @@ impl Drop for DrainVec {
|
|||||||
|
|
||||||
impl<S> ProcessResponse<S> {
|
impl<S> ProcessResponse<S> {
|
||||||
|
|
||||||
fn init(resp: HttpResponse) -> PipelineState<S>
|
fn init(resp: Box<HttpResponse>) -> PipelineState<S>
|
||||||
{
|
{
|
||||||
PipelineState::Response(
|
PipelineState::Response(
|
||||||
ProcessResponse{ resp: resp,
|
ProcessResponse{ resp: resp,
|
||||||
@ -786,14 +788,14 @@ impl<S> ProcessResponse<S> {
|
|||||||
|
|
||||||
/// Middlewares start executor
|
/// Middlewares start executor
|
||||||
struct FinishingMiddlewares<S> {
|
struct FinishingMiddlewares<S> {
|
||||||
resp: HttpResponse,
|
resp: Box<HttpResponse>,
|
||||||
fut: Option<Box<Future<Item=(), Error=Error>>>,
|
fut: Option<Box<Future<Item=(), Error=Error>>>,
|
||||||
_s: PhantomData<S>,
|
_s: PhantomData<S>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> FinishingMiddlewares<S> {
|
impl<S> FinishingMiddlewares<S> {
|
||||||
|
|
||||||
fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S> {
|
fn init(info: &mut PipelineInfo<S>, resp: Box<HttpResponse>) -> PipelineState<S> {
|
||||||
if info.count == 0 {
|
if info.count == 0 {
|
||||||
Completed::init(info)
|
Completed::init(info)
|
||||||
} else {
|
} else {
|
||||||
|
@ -267,6 +267,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
|
|||||||
};
|
};
|
||||||
let msg = IoStream{
|
let msg = IoStream{
|
||||||
io: socket.into_tcp_stream(), peer: Some(addr), http2: false};
|
io: socket.into_tcp_stream(), peer: Some(addr), http2: false};
|
||||||
|
println!("next: {}", next);
|
||||||
wrks[next].unbounded_send(msg).expect("worker thread died");
|
wrks[next].unbounded_send(msg).expect("worker thread died");
|
||||||
next = (next + 1) % wrks.len();
|
next = (next + 1) % wrks.len();
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,7 @@ impl<S> middlewares::Middleware<S> for MiddlewareTest {
|
|||||||
middlewares::Started::Done
|
middlewares::Started::Done
|
||||||
}
|
}
|
||||||
|
|
||||||
fn response(&self, _: &mut HttpRequest<S>, resp: HttpResponse) -> middlewares::Response {
|
fn response(&self, _: &mut HttpRequest<S>, resp: Box<HttpResponse>) -> middlewares::Response {
|
||||||
self.response.store(self.response.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
|
self.response.store(self.response.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
|
||||||
middlewares::Response::Done(resp)
|
middlewares::Response::Done(resp)
|
||||||
}
|
}
|
||||||
@ -85,9 +85,9 @@ fn test_middlewares() {
|
|||||||
|
|
||||||
HttpServer::new(
|
HttpServer::new(
|
||||||
move || vec![Application::new()
|
move || vec![Application::new()
|
||||||
.middleware(MiddlewareTest{start: act_num1.clone(),
|
.middleware(MiddlewareTest{start: Arc::clone(&act_num1),
|
||||||
response: act_num2.clone(),
|
response: Arc::clone(&act_num2),
|
||||||
finish: act_num3.clone()})
|
finish: Arc::clone(&act_num3)})
|
||||||
.resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))])
|
.resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))])
|
||||||
.serve::<_, ()>("127.0.0.1:58904").unwrap();
|
.serve::<_, ()>("127.0.0.1:58904").unwrap();
|
||||||
sys.run();
|
sys.run();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user