diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index 044483f4..47c6ed99 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -23,9 +23,11 @@ pub struct FramedNewService { impl FramedNewService where S: NewService, Response = Response>, - S::Service: 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Decoder + Encoder + 'static, + S::Error: 'static, + >>::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, ::Error: std::fmt::Debug, { pub fn new>>(factory: F1) -> Self { @@ -51,9 +53,11 @@ where impl NewService> for FramedNewService where S: NewService, Response = Response> + Clone, - S::Service: 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Decoder + Encoder + 'static, + S::Error: 'static, + >>::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, ::Error: std::fmt::Debug, { type Response = FramedTransport; @@ -90,9 +94,11 @@ where impl Service> for FramedService where S: NewService, Response = Response>, - S::Service: 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Decoder + Encoder + 'static, + S::Error: 'static, + >>::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, ::Error: std::fmt::Debug, { type Response = FramedTransport; @@ -116,9 +122,11 @@ where pub struct FramedServiceResponseFuture where S: NewService, Response = Response>, - S::Service: 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Decoder + Encoder + 'static, + S::Error: 'static, + >>::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, ::Error: std::fmt::Debug, { fut: S::Future, @@ -128,9 +136,11 @@ where impl Future for FramedServiceResponseFuture where S: NewService, Response = Response>, - S::Service: 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Decoder + Encoder + 'static, + S::Error: 'static, + >>::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, ::Error: std::fmt::Debug, { type Item = FramedTransport; @@ -164,13 +174,18 @@ impl From for FramedTransportError { /// and pass then to the service. pub struct FramedTransport where - S: Service, Response = Response> + 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Encoder + Decoder + 'static, + S: Service, Response = Response>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Encoder + Decoder, + ::Item: 'static, ::Error: std::fmt::Debug, { - inner: Cell>, - inner2: Cell>, + service: S, + state: TransportState, + framed: Framed, + inner: Cell::Item, S::Error>>, } enum TransportState>, U: Encoder + Decoder> { @@ -180,28 +195,22 @@ enum TransportState>, U: Encoder + Decoder> { Stopping, } -struct FramedTransportInner -where - S: Service, Response = Response> + 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Encoder + Decoder + 'static, - ::Error: std::fmt::Debug, -{ - service: S, - state: TransportState, - framed: Framed, - buf: VecDeque, S::Error>>, +struct FramedTransportInner { + buf: VecDeque>, task: AtomicTask, } -impl FramedTransportInner +impl FramedTransport where - S: Service, Response = Response> + 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Decoder + Encoder + 'static, + S: Service, Response = Response>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, ::Error: std::fmt::Debug, { - fn poll_service(&mut self, cell: &Cell>) -> bool { + fn poll_service(&mut self) -> bool { loop { match self.service.poll_ready() { Ok(Async::Ready(_)) => loop { @@ -219,8 +228,8 @@ where } }; - self.task.register(); - let mut cell = cell.clone(); + let mut cell = self.inner.clone(); + cell.get_mut().task.register(); tokio_current_thread::spawn(self.service.call(item).then(move |item| { let inner = cell.get_mut(); inner.buf.push_back(item); @@ -239,9 +248,10 @@ where /// write to sink fn poll_response(&mut self) -> bool { + let inner = self.inner.get_mut(); loop { while !self.framed.is_write_buf_full() { - if let Some(msg) = self.buf.pop_front() { + if let Some(msg) = inner.buf.pop_front() { match msg { Ok(msg) => { if let Err(err) = self.framed.force_send(msg) { @@ -284,78 +294,79 @@ where impl FramedTransport where - S: Service, Response = Response> + 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Decoder + Encoder + 'static, + S: Service, Response = Response>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, ::Error: std::fmt::Debug, { pub fn new>>(framed: Framed, service: F) -> Self { - let inner = Cell::new(FramedTransportInner { + FramedTransport { framed, service: service.into_service(), state: TransportState::Processing, - buf: VecDeque::new(), - task: AtomicTask::new(), - }); - - FramedTransport { - inner2: inner.clone(), - inner, + inner: Cell::new(FramedTransportInner { + buf: VecDeque::new(), + task: AtomicTask::new(), + }), } } /// Get reference to a service wrapped by `FramedTransport` instance. pub fn get_ref(&self) -> &S { - &self.inner.get_ref().service + &self.service } /// Get mutable reference to a service wrapped by `FramedTransport` /// instance. pub fn get_mut(&mut self) -> &mut S { - &mut self.inner.get_mut().service + &mut self.service } /// Get reference to a framed instance wrapped by `FramedTransport` /// instance. pub fn get_framed(&self) -> &Framed { - &self.inner.get_ref().framed + &self.framed } /// Get mutable reference to a framed instance wrapped by `FramedTransport` /// instance. pub fn get_framed_mut(&mut self) -> &mut Framed { - &mut self.inner.get_mut().framed + &mut self.framed } } impl Future for FramedTransport where - S: Service, Response = Response> + 'static, - T: AsyncRead + AsyncWrite + 'static, - U: Decoder + Encoder + 'static, + S: Service, Response = Response>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, ::Error: std::fmt::Debug, { type Item = (); type Error = FramedTransportError; fn poll(&mut self) -> Poll { - let inner = self.inner.get_mut(); - - match mem::replace(&mut inner.state, TransportState::Processing) { + match mem::replace(&mut self.state, TransportState::Processing) { TransportState::Processing => { - if inner.poll_service(&self.inner2) || inner.poll_response() { + if self.poll_service() || self.poll_response() { self.poll() } else { Ok(Async::NotReady) } } TransportState::Error(err) => { - if inner.framed.is_write_buf_empty() - || (inner.poll_response() || inner.framed.is_write_buf_empty()) + if self.framed.is_write_buf_empty() + || (self.poll_response() || self.framed.is_write_buf_empty()) { Err(err) } else { - inner.state = TransportState::Error(err); + self.state = TransportState::Error(err); Ok(Async::NotReady) } }