1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-08-31 08:57:00 +02:00

replace cloneable service with httpflow abstraction (#1876)

This commit is contained in:
fakeshadow
2021-01-07 02:43:52 +08:00
committed by GitHub
parent 57a3722146
commit a03dbe2dcf
7 changed files with 190 additions and 187 deletions

View File

@@ -1,9 +1,11 @@
use std::{
cell::RefCell,
collections::VecDeque,
fmt,
future::Future,
io, mem, net,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
@@ -15,17 +17,14 @@ use bytes::{Buf, BytesMut};
use log::{error, trace};
use pin_project::pin_project;
use crate::cloneable::CloneableService;
use crate::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::config::ServiceConfig;
use crate::error::{DispatchError, Error};
use crate::error::{ParseError, PayloadError};
use crate::httpmessage::HttpMessage;
use crate::request::Request;
use crate::response::Response;
use crate::{
body::{Body, BodySize, MessageBody, ResponseBody},
Extensions,
};
use crate::service::HttpFlow;
use crate::OnConnectData;
use super::codec::Codec;
use super::payload::{Payload, PayloadSender, PayloadStatus};
@@ -78,7 +77,7 @@ where
U::Error: fmt::Display,
{
Normal(#[pin] InnerDispatcher<T, S, B, X, U>),
Upgrade(Pin<Box<U::Future>>),
Upgrade(#[pin] U::Future),
}
#[pin_project(project = InnerDispatcherProj)]
@@ -92,10 +91,8 @@ where
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
on_connect_data: Extensions,
services: Rc<RefCell<HttpFlow<S, X, U>>>,
on_connect_data: OnConnectData,
flags: Flags,
peer_addr: Option<net::SocketAddr>,
error: Option<DispatchError>,
@@ -180,10 +177,8 @@ where
pub(crate) fn new(
stream: T,
config: ServiceConfig,
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
on_connect_data: Extensions,
services: Rc<RefCell<HttpFlow<S, X, U>>>,
on_connect_data: OnConnectData,
peer_addr: Option<net::SocketAddr>,
) -> Self {
Dispatcher::with_timeout(
@@ -192,9 +187,7 @@ where
config,
BytesMut::with_capacity(HW_BUFFER_SIZE),
None,
service,
expect,
upgrade,
services,
on_connect_data,
peer_addr,
)
@@ -207,10 +200,8 @@ where
config: ServiceConfig,
read_buf: BytesMut,
timeout: Option<Sleep>,
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
on_connect_data: Extensions,
services: Rc<RefCell<HttpFlow<S, X, U>>>,
on_connect_data: OnConnectData,
peer_addr: Option<net::SocketAddr>,
) -> Self {
let keepalive = config.keep_alive_enabled();
@@ -239,9 +230,7 @@ where
io: Some(io),
codec,
read_buf,
service,
expect,
upgrade,
services,
on_connect_data,
flags,
peer_addr,
@@ -395,7 +384,8 @@ where
Poll::Ready(Ok(req)) => {
self.as_mut().send_continue();
this = self.as_mut().project();
this.state.set(State::ServiceCall(this.service.call(req)));
let fut = this.services.borrow_mut().service.call(req);
this.state.set(State::ServiceCall(fut));
continue;
}
Poll::Ready(Err(e)) => {
@@ -483,12 +473,14 @@ where
// Handle `EXPECT: 100-Continue` header
if req.head().expect() {
// set dispatcher state so the future is pinned.
let task = self.as_mut().project().expect.call(req);
self.as_mut().project().state.set(State::ExpectCall(task));
let mut this = self.as_mut().project();
let task = this.services.borrow_mut().expect.call(req);
this.state.set(State::ExpectCall(task));
} else {
// the same as above.
let task = self.as_mut().project().service.call(req);
self.as_mut().project().state.set(State::ServiceCall(task));
let mut this = self.as_mut().project();
let task = this.services.borrow_mut().service.call(req);
this.state.set(State::ServiceCall(task));
};
// eagerly poll the future for once(or twice if expect is resolved immediately).
@@ -499,8 +491,9 @@ where
// expect is resolved. continue loop and poll the service call branch.
Poll::Ready(Ok(req)) => {
self.as_mut().send_continue();
let task = self.as_mut().project().service.call(req);
self.as_mut().project().state.set(State::ServiceCall(task));
let mut this = self.as_mut().project();
let task = this.services.borrow_mut().service.call(req);
this.state.set(State::ServiceCall(task));
continue;
}
// future is pending. return Ok(()) to notify that a new state is
@@ -568,9 +561,11 @@ where
req.head_mut().peer_addr = *this.peer_addr;
// merge on_connect_ext data into request extensions
req.extensions_mut().drain_from(this.on_connect_data);
this.on_connect_data.merge_into(&mut req);
if pl == MessageType::Stream && this.upgrade.is_some() {
if pl == MessageType::Stream
&& this.services.borrow().upgrade.is_some()
{
this.messages.push_back(DispatcherMessage::Upgrade(req));
break;
}
@@ -834,12 +829,17 @@ where
);
parts.write_buf = mem::take(inner_p.write_buf);
let framed = Framed::from_parts(parts);
let upgrade =
inner_p.upgrade.take().unwrap().call((req, framed));
let upgrade = inner_p
.services
.borrow_mut()
.upgrade
.take()
.unwrap()
.call((req, framed));
self.as_mut()
.project()
.inner
.set(DispatcherState::Upgrade(Box::pin(upgrade)));
.set(DispatcherState::Upgrade(upgrade));
return self.poll(cx);
}
@@ -890,7 +890,7 @@ where
}
}
}
DispatcherStateProj::Upgrade(fut) => fut.as_mut().poll(cx).map_err(|e| {
DispatcherStateProj::Upgrade(fut) => fut.poll(cx).map_err(|e| {
error!("Upgrade handler error: {}", e);
DispatchError::Upgrade
}),
@@ -1028,13 +1028,13 @@ mod tests {
lazy(|cx| {
let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n");
let services = HttpFlow::new(ok_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf,
ServiceConfig::default(),
CloneableService::new(ok_service()),
CloneableService::new(ExpectHandler),
None,
Extensions::new(),
services,
OnConnectData::default(),
None,
);
@@ -1068,13 +1068,13 @@ mod tests {
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf,
cfg,
CloneableService::new(echo_path_service()),
CloneableService::new(ExpectHandler),
None,
Extensions::new(),
services,
OnConnectData::default(),
None,
);
@@ -1122,13 +1122,13 @@ mod tests {
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf,
cfg,
CloneableService::new(echo_path_service()),
CloneableService::new(ExpectHandler),
None,
Extensions::new(),
services,
OnConnectData::default(),
None,
);
@@ -1171,13 +1171,14 @@ mod tests {
lazy(|cx| {
let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
cfg,
CloneableService::new(echo_payload_service()),
CloneableService::new(ExpectHandler),
None,
Extensions::new(),
services,
OnConnectData::default(),
None,
);
@@ -1242,13 +1243,14 @@ mod tests {
lazy(|cx| {
let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
cfg,
CloneableService::new(echo_path_service()),
CloneableService::new(ExpectHandler),
None,
Extensions::new(),
services,
OnConnectData::default(),
None,
);
@@ -1301,13 +1303,15 @@ mod tests {
lazy(|cx| {
let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
let services =
HttpFlow::new(ok_service(), ExpectHandler, Some(UpgradeHandler));
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
cfg,
CloneableService::new(ok_service()),
CloneableService::new(ExpectHandler),
Some(CloneableService::new(UpgradeHandler)),
Extensions::new(),
services,
OnConnectData::default(),
None,
);