diff --git a/actix-ioframe/src/connect.rs b/actix-ioframe/src/connect.rs index 3302e141..406066f2 100644 --- a/actix-ioframe/src/connect.rs +++ b/actix-ioframe/src/connect.rs @@ -6,49 +6,75 @@ use crate::dispatcher::FramedMessage; use crate::sink::Sink; pub struct Connect { - io: Io, - codec: Codec, + io: IoItem, state: St, - // rx: mpsc::UnboundedReceiver::Item>>, - // sink: Sink<::Item>, +} + +enum IoItem { + Io(Io), + Framed(Framed), +} + +impl IoItem +where + Io: AsyncRead + AsyncWrite, +{ + fn into_codec(self, codec: Codec) -> IoItem + where + Codec: Encoder + Decoder, + { + match self { + IoItem::Io(io) => IoItem::Framed(Framed::new(io, codec)), + IoItem::Framed(framed) => IoItem::Framed(framed.into_framed(codec)), + } + } + + fn as_framed(&mut self) -> &mut Framed + where + C: Encoder + Decoder, + { + match self { + IoItem::Io(_) => panic!("Codec is not set"), + IoItem::Framed(ref mut framed) => framed, + } + } + + fn into_framed(self) -> Framed + where + C: Encoder + Decoder, + { + match self { + IoItem::Io(_) => panic!("Codec is not set"), + IoItem::Framed(framed) => framed, + } + } } impl Connect { pub(crate) fn new(io: Io) -> Self { Self { - io, - codec: (), + io: IoItem::Io(io), state: (), } } } -impl Connect { - pub fn codec(self, codec: Codec) -> Connect { +impl Connect +where + Io: AsyncRead + AsyncWrite, +{ + pub fn codec(self, codec: Codec) -> Connect + where + Codec: Encoder + Decoder, + { Connect { - codec, - io: self.io, + io: self.io.into_codec(codec), state: self.state, } } pub fn state(self, state: St) -> Connect { - Connect { - state, - io: self.io, - codec: self.codec, - } - } - - pub fn state_fn(self, f: F) -> Connect - where - F: FnOnce(&Connect) -> St, - { - Connect { - state: f(&self), - io: self.io, - codec: self.codec, - } + Connect { state, io: self.io } } } @@ -63,13 +89,50 @@ where ConnectResult { state: Cell::new(self.state), - framed: Framed::new(self.io, self.codec), + framed: self.io.into_framed(), rx, sink, } } } +impl futures::Stream for Connect +where + Codec: Encoder + Decoder, + Io: AsyncRead + AsyncWrite, +{ + type Item = ::Item; + type Error = ::Error; + + fn poll(&mut self) -> futures::Poll, Self::Error> { + self.io.as_framed().poll() + } +} + +impl futures::Sink for Connect +where + Codec: Encoder + Decoder, + Io: AsyncRead + AsyncWrite, +{ + type SinkItem = ::Item; + type SinkError = ::Error; + + fn start_send( + &mut self, + item: Self::SinkItem, + ) -> futures::StartSend { + self.io.as_framed().start_send(item) + } + + fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { + self.io.as_framed().poll_complete() + } + + fn close(&mut self) -> futures::Poll<(), Self::SinkError> { + self.io.as_framed().close() + } +} + pub struct ConnectResult { pub(crate) state: Cell, pub(crate) framed: Framed, diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs index 9819c9d1..8ca9a964 100644 --- a/actix-ioframe/src/service.rs +++ b/actix-ioframe/src/service.rs @@ -193,7 +193,6 @@ pub struct FramedServiceImpl { impl Service for FramedServiceImpl where - // St: 'static, Io: AsyncRead + AsyncWrite, C: Service, Response = ConnectResult>, C::Error: 'static,