mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-24 16:02:59 +01:00
simplify channels list management
This commit is contained in:
parent
f456be0309
commit
058630d041
@ -18,15 +18,6 @@ enum HttpProtocol<T: IoStream, H: 'static> {
|
||||
Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut),
|
||||
}
|
||||
|
||||
impl<T: IoStream, H: 'static> HttpProtocol<T, H> {
|
||||
fn is_unknown(&self) -> bool {
|
||||
match *self {
|
||||
HttpProtocol::Unknown(_, _, _, _) => true,
|
||||
_ => false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ProtocolKind {
|
||||
Http1,
|
||||
Http2,
|
||||
@ -44,15 +35,14 @@ impl<T, H> HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static
|
||||
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
|
||||
{
|
||||
settings.add_channel();
|
||||
|
||||
if http2 {
|
||||
HttpChannel {
|
||||
node: None,
|
||||
proto: Some(HttpProtocol::H2(
|
||||
node: None, proto: Some(HttpProtocol::H2(
|
||||
h2::Http2::new(settings, io, peer, Bytes::new()))) }
|
||||
} else {
|
||||
HttpChannel {
|
||||
node: None,
|
||||
proto: Some(HttpProtocol::Unknown(
|
||||
node: None, proto: Some(HttpProtocol::Unknown(
|
||||
settings, peer, io, BytesMut::with_capacity(4096))) }
|
||||
}
|
||||
}
|
||||
@ -78,15 +68,18 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if !self.proto.as_ref().map(|p| p.is_unknown()).unwrap_or(false) && self.node.is_none() {
|
||||
self.node = Some(Node::new(self));
|
||||
match self.proto {
|
||||
if !self.node.is_none() {
|
||||
let el = self as *mut _;
|
||||
self.node = Some(Node::new(el));
|
||||
let _ = match self.proto {
|
||||
Some(HttpProtocol::H1(ref mut h1)) =>
|
||||
h1.settings().head().insert(self.node.as_ref().unwrap()),
|
||||
self.node.as_ref().map(|n| h1.settings().head().insert(n)),
|
||||
Some(HttpProtocol::H2(ref mut h2)) =>
|
||||
h2.settings().head().insert(self.node.as_ref().unwrap()),
|
||||
_ => (),
|
||||
}
|
||||
self.node.as_ref().map(|n| h2.settings().head().insert(n)),
|
||||
Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) =>
|
||||
self.node.as_ref().map(|n| settings.head().insert(n)),
|
||||
None => unreachable!(),
|
||||
};
|
||||
}
|
||||
|
||||
let kind = match self.proto {
|
||||
@ -95,7 +88,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
|
||||
match result {
|
||||
Ok(Async::Ready(())) | Err(_) => {
|
||||
h1.settings().remove_channel();
|
||||
self.node.as_mut().unwrap().remove();
|
||||
self.node.as_mut().map(|n| n.remove());
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
@ -106,7 +99,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
|
||||
match result {
|
||||
Ok(Async::Ready(())) | Err(_) => {
|
||||
h2.settings().remove_channel();
|
||||
self.node.as_mut().unwrap().remove();
|
||||
self.node.as_mut().map(|n| n.remove());
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
@ -117,6 +110,7 @@ impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'sta
|
||||
Ok(Async::Ready(0)) | Err(_) => {
|
||||
debug!("Ignored premature client disconnection");
|
||||
settings.remove_channel();
|
||||
self.node.as_mut().map(|n| n.remove());
|
||||
return Err(())
|
||||
},
|
||||
_ => (),
|
||||
@ -163,11 +157,11 @@ pub(crate) struct Node<T>
|
||||
|
||||
impl<T> Node<T>
|
||||
{
|
||||
fn new(el: &mut T) -> Self {
|
||||
fn new(el: *mut T) -> Self {
|
||||
Node {
|
||||
next: None,
|
||||
prev: None,
|
||||
element: el as *mut _,
|
||||
element: el,
|
||||
}
|
||||
}
|
||||
|
||||
@ -187,14 +181,13 @@ impl<T> Node<T>
|
||||
}
|
||||
|
||||
fn remove(&mut self) {
|
||||
#[allow(mutable_transmutes)]
|
||||
unsafe {
|
||||
let mut prev = self.prev.take();
|
||||
self.element = ptr::null_mut();
|
||||
let next = self.next.take();
|
||||
let mut prev = self.prev.take();
|
||||
|
||||
if let Some(ref mut prev) = prev {
|
||||
let p: &mut Node<()> = mem::transmute(prev.as_ref().unwrap());
|
||||
p.next = next;
|
||||
prev.as_mut().unwrap().next = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ pub(crate) struct WorkerSettings<H> {
|
||||
bytes: Rc<SharedBytesPool>,
|
||||
messages: Rc<helpers::SharedMessagePool>,
|
||||
channels: Cell<usize>,
|
||||
node: Node<()>,
|
||||
node: Box<Node<()>>,
|
||||
}
|
||||
|
||||
impl<H> WorkerSettings<H> {
|
||||
@ -75,7 +75,7 @@ impl<H> WorkerSettings<H> {
|
||||
bytes: Rc::new(SharedBytesPool::new()),
|
||||
messages: Rc::new(helpers::SharedMessagePool::new()),
|
||||
channels: Cell::new(0),
|
||||
node: Node::head(),
|
||||
node: Box::new(Node::head()),
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user