use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use futures::unsync::mpsc; use crate::cell::Cell; use crate::dispatcher::FramedMessage; use crate::sink::Sink; pub struct Connect { io: Io, codec: Codec, state: St, // rx: mpsc::UnboundedReceiver::Item>>, // sink: Sink<::Item>, } impl Connect { pub(crate) fn new(io: Io) -> Self { Self { io, codec: (), state: (), } } } impl Connect { pub fn codec(self, codec: Codec) -> Connect { Connect { codec, io: self.io, state: self.state, } } pub fn state(self, state: St) -> Connect { Connect { state, io: self.io, codec: self.codec, } } pub fn state_fn(self, f: F) -> Connect where F: FnOnce(&Connect) -> St, { Connect { state: f(&self), io: self.io, codec: self.codec, } } } impl Connect where C: Encoder + Decoder, Io: AsyncRead + AsyncWrite, { pub fn into_result(self) -> ConnectResult { let (tx, rx) = mpsc::unbounded(); let sink = Sink::new(tx); ConnectResult { state: Cell::new(self.state), framed: Framed::new(self.io, self.codec), rx, sink, } } } pub struct ConnectResult { pub(crate) state: Cell, pub(crate) framed: Framed, pub(crate) rx: mpsc::UnboundedReceiver::Item>>, pub(crate) sink: Sink<::Item>, } impl ConnectResult { #[inline] pub fn sink(&self) -> &Sink<::Item> { &self.sink } } impl futures::Stream for ConnectResult where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, { type Item = ::Item; type Error = ::Error; fn poll(&mut self) -> futures::Poll, Self::Error> { self.framed.poll() } } impl futures::Sink for ConnectResult where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, { type SinkItem = ::Item; type SinkError = ::Error; fn start_send( &mut self, item: Self::SinkItem, ) -> futures::StartSend { self.framed.start_send(item) } fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { self.framed.poll_complete() } fn close(&mut self) -> futures::Poll<(), Self::SinkError> { self.framed.close() } }