1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-06-25 06:39:22 +02:00

add connection level data container (#2491)

This commit is contained in:
Rob Ede
2021-12-07 17:23:34 +00:00
committed by GitHub
parent 069cf2da07
commit d35b7644dc
18 changed files with 152 additions and 125 deletions

View File

@ -1,6 +1,6 @@
use std::{
any::{Any, TypeId},
fmt, mem,
fmt,
};
use ahash::AHashMap;
@ -10,8 +10,7 @@ use ahash::AHashMap;
/// All entries into this map must be owned types (or static references).
#[derive(Default)]
pub struct Extensions {
/// Use FxHasher with a std HashMap with for faster
/// lookups on the small `TypeId` (u64 equivalent) keys.
/// Use AHasher with a std HashMap with for faster lookups on the small `TypeId` keys.
map: AHashMap<TypeId, Box<dyn Any>>,
}
@ -123,11 +122,6 @@ impl Extensions {
pub fn extend(&mut self, other: Extensions) {
self.map.extend(other.map);
}
/// Sets (or overrides) items from `other` into this map.
pub(crate) fn drain_from(&mut self, other: &mut Self) {
self.map.extend(mem::take(&mut other.map));
}
}
impl fmt::Debug for Extensions {
@ -179,6 +173,8 @@ mod tests {
#[test]
fn test_integers() {
static A: u32 = 8;
let mut map = Extensions::new();
map.insert::<i8>(8);
@ -191,6 +187,7 @@ mod tests {
map.insert::<u32>(32);
map.insert::<u64>(64);
map.insert::<u128>(128);
map.insert::<&'static u32>(&A);
assert!(map.get::<i8>().is_some());
assert!(map.get::<i16>().is_some());
assert!(map.get::<i32>().is_some());
@ -201,6 +198,7 @@ mod tests {
assert!(map.get::<u32>().is_some());
assert!(map.get::<u64>().is_some());
assert!(map.get::<u128>().is_some());
assert!(map.get::<&'static u32>().is_some());
}
#[test]
@ -279,27 +277,4 @@ mod tests {
assert_eq!(extensions.get(), Some(&20u8));
assert_eq!(extensions.get_mut(), Some(&mut 20u8));
}
#[test]
fn test_drain_from() {
let mut ext = Extensions::new();
ext.insert(2isize);
let mut more_ext = Extensions::new();
more_ext.insert(5isize);
more_ext.insert(5usize);
assert_eq!(ext.get::<isize>(), Some(&2isize));
assert_eq!(ext.get::<usize>(), None);
assert_eq!(more_ext.get::<isize>(), Some(&5isize));
assert_eq!(more_ext.get::<usize>(), Some(&5usize));
ext.drain_from(&mut more_ext);
assert_eq!(ext.get::<isize>(), Some(&5isize));
assert_eq!(ext.get::<usize>(), Some(&5usize));
assert_eq!(more_ext.get::<isize>(), None);
assert_eq!(more_ext.get::<usize>(), None);
}
}

View File

@ -22,7 +22,7 @@ use crate::{
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
service::HttpFlow,
OnConnectData, Request, Response, StatusCode,
Extensions, OnConnectData, Request, Response, StatusCode,
};
use super::{
@ -100,9 +100,9 @@ where
U::Error: fmt::Display,
{
flow: Rc<HttpFlow<S, X, U>>,
on_connect_data: OnConnectData,
flags: Flags,
peer_addr: Option<net::SocketAddr>,
conn_data: Option<Rc<Extensions>>,
error: Option<DispatchError>,
#[pin]
@ -179,10 +179,10 @@ where
/// Create HTTP/1 dispatcher.
pub(crate) fn new(
io: T,
config: ServiceConfig,
flow: Rc<HttpFlow<S, X, U>>,
on_connect_data: OnConnectData,
config: ServiceConfig,
peer_addr: Option<net::SocketAddr>,
conn_data: OnConnectData,
) -> Self {
let flags = if config.keep_alive_enabled() {
Flags::KEEPALIVE
@ -198,20 +198,23 @@ where
Dispatcher {
inner: DispatcherState::Normal(InnerDispatcher {
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
payload: None,
state: State::None,
error: None,
messages: VecDeque::new(),
io: Some(io),
codec: Codec::new(config),
flow,
on_connect_data,
flags,
peer_addr,
conn_data: conn_data.0.map(Rc::new),
error: None,
state: State::None,
payload: None,
messages: VecDeque::new(),
ka_expire,
ka_timer,
io: Some(io),
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
codec: Codec::new(config),
}),
#[cfg(test)]
@ -593,8 +596,7 @@ where
Message::Item(mut req) => {
req.head_mut().peer_addr = *this.peer_addr;
// merge on_connect_ext data into request extensions
this.on_connect_data.merge_into(&mut req);
req.conn_data = this.conn_data.as_ref().map(Rc::clone);
match this.codec.message_type() {
// Request is upgradable. add upgrade message and break.
@ -1100,10 +1102,10 @@ mod tests {
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf,
ServiceConfig::default(),
services,
OnConnectData::default(),
ServiceConfig::default(),
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
@ -1140,10 +1142,10 @@ mod tests {
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf,
cfg,
services,
OnConnectData::default(),
cfg,
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
@ -1194,10 +1196,10 @@ mod tests {
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf,
cfg,
services,
OnConnectData::default(),
cfg,
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
@ -1244,10 +1246,10 @@ mod tests {
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
cfg,
services,
OnConnectData::default(),
cfg,
None,
OnConnectData::default(),
);
buf.extend_read_buf(
@ -1316,10 +1318,10 @@ mod tests {
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
cfg,
services,
OnConnectData::default(),
cfg,
None,
OnConnectData::default(),
);
buf.extend_read_buf(
@ -1393,10 +1395,10 @@ mod tests {
let h1 = Dispatcher::<_, _, _, _, TestUpgrade>::new(
buf.clone(),
cfg,
services,
OnConnectData::default(),
cfg,
None,
OnConnectData::default(),
);
buf.extend_read_buf(

View File

@ -365,15 +365,7 @@ where
}
fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {
let on_connect_data =
OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
Dispatcher::new(
io,
self.cfg.clone(),
self.flow.clone(),
on_connect_data,
addr,
)
let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
Dispatcher::new(io, self.flow.clone(), self.cfg.clone(), addr, conn_data)
}
}

View File

@ -27,7 +27,7 @@ use crate::{
body::{BodySize, BoxBody, MessageBody},
config::ServiceConfig,
service::HttpFlow,
OnConnectData, Payload, Request, Response, ResponseHead,
Extensions, OnConnectData, Payload, Request, Response, ResponseHead,
};
const CHUNK_SIZE: usize = 16_384;
@ -37,7 +37,7 @@ pin_project! {
pub struct Dispatcher<T, S, B, X, U> {
flow: Rc<HttpFlow<S, X, U>>,
connection: Connection<T, Bytes>,
on_connect_data: OnConnectData,
conn_data: Option<Rc<Extensions>>,
config: ServiceConfig,
peer_addr: Option<net::SocketAddr>,
ping_pong: Option<H2PingPong>,
@ -50,11 +50,11 @@ where
T: AsyncRead + AsyncWrite + Unpin,
{
pub(crate) fn new(
flow: Rc<HttpFlow<S, X, U>>,
mut conn: Connection<T, Bytes>,
on_connect_data: OnConnectData,
flow: Rc<HttpFlow<S, X, U>>,
config: ServiceConfig,
peer_addr: Option<net::SocketAddr>,
conn_data: OnConnectData,
timer: Option<Pin<Box<Sleep>>>,
) -> Self {
let ping_pong = config.keep_alive().map(|dur| H2PingPong {
@ -74,7 +74,7 @@ where
config,
peer_addr,
connection: conn,
on_connect_data,
conn_data: conn_data.0.map(Rc::new),
ping_pong,
_phantom: PhantomData,
}
@ -119,8 +119,7 @@ where
head.headers = parts.headers.into();
head.peer_addr = this.peer_addr;
// merge on_connect_ext data into request extensions
this.on_connect_data.merge_into(&mut req);
req.conn_data = this.conn_data.as_ref().map(Rc::clone);
let fut = this.flow.service.call(req);
let config = this.config.clone();

View File

@ -1,7 +1,7 @@
use std::{
future::Future,
marker::PhantomData,
net,
mem, net,
pin::Pin,
rc::Rc,
task::{Context, Poll},
@ -339,21 +339,24 @@ where
ref mut srv,
ref mut config,
ref peer_addr,
ref mut on_connect_data,
ref mut conn_data,
ref mut handshake,
) => match ready!(Pin::new(handshake).poll(cx)) {
Ok((conn, timer)) => {
let on_connect_data = std::mem::take(on_connect_data);
let on_connect_data = mem::take(conn_data);
self.state = State::Incoming(Dispatcher::new(
srv.take().unwrap(),
conn,
on_connect_data,
srv.take().unwrap(),
config.take().unwrap(),
*peer_addr,
on_connect_data,
timer,
));
self.poll(cx)
}
Err(err) => {
trace!("H2 handshake error: {}", err);
Poll::Ready(Err(err))

View File

@ -92,19 +92,11 @@ impl OnConnectData {
on_connect_ext: Option<&ConnectCallback<T>>,
) -> Self {
let ext = on_connect_ext.map(|handler| {
let mut extensions = Extensions::new();
let mut extensions = Extensions::default();
handler(io, &mut extensions);
extensions
});
Self(ext)
}
/// Merge self into given request's extensions.
#[inline]
pub(crate) fn merge_into(&mut self, req: &mut Request) {
if let Some(ref mut ext) = self.0 {
req.head.extensions.get_mut().drain_from(ext);
}
}
}

View File

@ -2,7 +2,9 @@
use std::{
cell::{Ref, RefMut},
fmt, net, str,
fmt, net,
rc::Rc,
str,
};
use http::{header, Method, Uri, Version};
@ -19,6 +21,7 @@ use crate::{
pub struct Request<P = PayloadStream> {
pub(crate) payload: Payload<P>,
pub(crate) head: Message<RequestHead>,
pub(crate) conn_data: Option<Rc<Extensions>>,
}
impl<P> HttpMessage for Request<P> {
@ -51,6 +54,7 @@ impl From<Message<RequestHead>> for Request<PayloadStream> {
Request {
head,
payload: Payload::None,
conn_data: None,
}
}
}
@ -61,6 +65,7 @@ impl Request<PayloadStream> {
Request {
head: Message::new(),
payload: Payload::None,
conn_data: None,
}
}
}
@ -71,16 +76,19 @@ impl<P> Request<P> {
Request {
payload,
head: Message::new(),
conn_data: None,
}
}
/// Create new Request instance
pub fn replace_payload<P1>(self, payload: Payload<P1>) -> (Request<P1>, Payload<P>) {
let pl = self.payload;
(
Request {
payload,
head: self.head,
conn_data: self.conn_data,
},
pl,
)
@ -170,6 +178,26 @@ impl<P> Request<P> {
pub fn peer_addr(&self) -> Option<net::SocketAddr> {
self.head().peer_addr
}
/// Returns a reference a piece of connection data set in an [on-connect] callback.
///
/// ```ignore
/// let opt_t = req.conn_data::<PeerCertificate>();
/// ```
///
/// [on-connect]: crate::HttpServiceBuilder::on_connect_ext
pub fn conn_data<T: 'static>(&self) -> Option<&T> {
self.conn_data
.as_deref()
.and_then(|container| container.get::<T>())
}
/// Returns the connection data container if an [on-connect] callback was registered.
///
/// [on-connect]: crate::HttpServiceBuilder::on_connect_ext
pub fn take_conn_data(&mut self) -> Option<Rc<Extensions>> {
self.conn_data.take()
}
}
impl<P> fmt::Debug for Request<P> {

View File

@ -507,8 +507,7 @@ where
&self,
(io, proto, peer_addr): (T, Protocol, Option<net::SocketAddr>),
) -> Self::Future {
let on_connect_data =
OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
match proto {
Protocol::Http2 => HttpServiceHandlerResponse {
@ -517,7 +516,7 @@ where
h2::handshake_with_timeout(io, &self.cfg),
self.cfg.clone(),
self.flow.clone(),
on_connect_data,
conn_data,
peer_addr,
)),
},
@ -527,10 +526,10 @@ where
state: State::H1 {
dispatcher: h1::Dispatcher::new(
io,
self.cfg.clone(),
self.flow.clone(),
on_connect_data,
self.cfg.clone(),
peer_addr,
conn_data,
),
},
},
@ -627,17 +626,12 @@ where
StateProj::H2Handshake { handshake: data } => {
match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) {
Ok((conn, timer)) => {
let (_, config, flow, on_connect_data, peer_addr) =
let (_, config, flow, conn_data, peer_addr) =
data.take().unwrap();
self.as_mut().project().state.set(State::H2 {
dispatcher: h2::Dispatcher::new(
flow,
conn,
on_connect_data,
config,
peer_addr,
timer,
conn, flow, config, peer_addr, conn_data, timer,
),
});
self.poll(cx)