From 0c1f5f9edc581878da5a6c1d4b40a90ed22d1b99 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 9 Dec 2019 17:40:15 +0600 Subject: [PATCH] Check Upgrade service readiness before calling it --- actix-http/CHANGES.md | 5 ++++ actix-http/src/h1/dispatcher.rs | 42 +++++++++++++++++++++++++-------- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index b15a0c65..fc1eea56 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,5 +1,10 @@ # Changes +## [1.0.0-alpha.5] - 2019-12-xx + +### Fixed + +* Check `Upgrade` service readiness before calling it ### Changed diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 7276d5a3..3bcf1137 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -66,6 +66,7 @@ where U::Error: fmt::Display, { Normal(InnerDispatcher), + UpgradeReadiness(InnerDispatcher, Request), Upgrade(U::Future), None, } @@ -761,16 +762,8 @@ where if let DispatcherState::Normal(inner) = std::mem::replace(&mut self.inner, DispatcherState::None) { - let mut parts = FramedParts::with_read_buf( - inner.io, - inner.codec, - inner.read_buf, - ); - parts.write_buf = inner.write_buf; - let framed = Framed::from_parts(parts); - self.inner = DispatcherState::Upgrade( - inner.upgrade.unwrap().call((req, framed)), - ); + self.inner = + DispatcherState::UpgradeReadiness(inner, req); return self.poll(cx); } else { panic!() @@ -820,6 +813,35 @@ where } } } + DispatcherState::UpgradeReadiness(ref mut inner, _) => { + let upgrade = inner.upgrade.as_mut().unwrap(); + match upgrade.poll_ready(cx) { + Poll::Ready(Ok(_)) => { + if let DispatcherState::UpgradeReadiness(inner, req) = + std::mem::replace(&mut self.inner, DispatcherState::None) + { + let mut parts = FramedParts::with_read_buf( + inner.io, + inner.codec, + inner.read_buf, + ); + parts.write_buf = inner.write_buf; + let framed = Framed::from_parts(parts); + self.inner = DispatcherState::Upgrade( + inner.upgrade.unwrap().call((req, framed)), + ); + self.poll(cx) + } else { + panic!() + } + } + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + error!("Upgrade handler readiness check error: {}", e); + Poll::Ready(Err(DispatchError::Upgrade)) + } + } + } DispatcherState::Upgrade(ref mut fut) => { unsafe { Pin::new_unchecked(fut) }.poll(cx).map_err(|e| { error!("Upgrade handler error: {}", e);