From 7e135b798b1f20368b62a9d2f7f03dbbc361bffe Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 5 Oct 2018 14:30:40 -0700 Subject: [PATCH] add websocket transport and test --- src/framed/framed.rs | 283 ------------------------------------- src/framed/framed_read.rs | 216 ---------------------------- src/framed/framed_write.rs | 243 ------------------------------- src/framed/mod.rs | 32 ----- src/h1/dispatcher.rs | 4 +- src/lib.rs | 3 - src/ws/mod.rs | 4 +- src/ws/transport.rs | 50 +++++++ tests/test_ws.rs | 111 +++++++++++++++ 9 files changed, 165 insertions(+), 781 deletions(-) delete mode 100644 src/framed/framed.rs delete mode 100644 src/framed/framed_read.rs delete mode 100644 src/framed/framed_write.rs delete mode 100644 src/framed/mod.rs create mode 100644 src/ws/transport.rs create mode 100644 tests/test_ws.rs diff --git a/src/framed/framed.rs b/src/framed/framed.rs deleted file mode 100644 index f6295d98..00000000 --- a/src/framed/framed.rs +++ /dev/null @@ -1,283 +0,0 @@ -#![allow(deprecated)] - -use std::fmt; -use std::io::{self, Read, Write}; - -use bytes::BytesMut; -use futures::{Poll, Sink, StartSend, Stream}; -use tokio_codec::{Decoder, Encoder}; -use tokio_io::{AsyncRead, AsyncWrite}; - -use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; -use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; - -/// A unified `Stream` and `Sink` interface to an underlying I/O object, using -/// the `Encoder` and `Decoder` traits to encode and decode frames. -/// -/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. -pub struct Framed { - inner: FramedRead2>>, -} - -pub struct Fuse(pub T, pub U); - -impl Framed -where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, -{ - /// Provides a `Stream` and `Sink` interface for reading and writing to this - /// `Io` object, using `Decode` and `Encode` to read and write the raw data. - /// - /// Raw I/O objects work with byte sequences, but higher-level code usually - /// wants to batch these into meaningful chunks, called "frames". This - /// method layers framing on top of an I/O object, by using the `Codec` - /// traits to handle encoding and decoding of messages frames. Note that - /// the incoming and outgoing frame types may be distinct. - /// - /// This function returns a *single* object that is both `Stream` and - /// `Sink`; grouping this into a single object is often useful for layering - /// things like gzip or TLS, which require both read and write access to the - /// underlying object. - /// - /// If you want to work more directly with the streams and sink, consider - /// calling `split` on the `Framed` returned by this method, which will - /// break them into separate objects, allowing them to interact more easily. - pub fn new(inner: T, codec: U) -> Framed { - Framed { - inner: framed_read2(framed_write2(Fuse(inner, codec))), - } - } -} - -impl Framed { - /// Provides a `Stream` and `Sink` interface for reading and writing to this - /// `Io` object, using `Decode` and `Encode` to read and write the raw data. - /// - /// Raw I/O objects work with byte sequences, but higher-level code usually - /// wants to batch these into meaningful chunks, called "frames". This - /// method layers framing on top of an I/O object, by using the `Codec` - /// traits to handle encoding and decoding of messages frames. Note that - /// the incoming and outgoing frame types may be distinct. - /// - /// This function returns a *single* object that is both `Stream` and - /// `Sink`; grouping this into a single object is often useful for layering - /// things like gzip or TLS, which require both read and write access to the - /// underlying object. - /// - /// This objects takes a stream and a readbuffer and a writebuffer. These field - /// can be obtained from an existing `Framed` with the `into_parts` method. - /// - /// If you want to work more directly with the streams and sink, consider - /// calling `split` on the `Framed` returned by this method, which will - /// break them into separate objects, allowing them to interact more easily. - pub fn from_parts(parts: FramedParts) -> Framed { - Framed { - inner: framed_read2_with_buffer( - framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf), - parts.read_buf, - ), - } - } - - /// Returns a reference to the underlying codec. - pub fn get_codec(&self) -> &U { - &self.inner.get_ref().get_ref().1 - } - - /// Returns a mutable reference to the underlying codec. - pub fn get_codec_mut(&mut self) -> &mut U { - &mut self.inner.get_mut().get_mut().1 - } - - /// Returns a reference to the underlying I/O stream wrapped by - /// `Frame`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_ref(&self) -> &T { - &self.inner.get_ref().get_ref().0 - } - - /// Returns a mutable reference to the underlying I/O stream wrapped by - /// `Frame`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.get_mut().get_mut().0 - } - - /// Consumes the `Frame`, returning its underlying I/O stream. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn into_inner(self) -> T { - self.inner.into_inner().into_inner().0 - } - - /// Consumes the `Frame`, returning its underlying I/O stream, the buffer - /// with unprocessed data, and the codec. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn into_parts(self) -> FramedParts { - let (inner, read_buf) = self.inner.into_parts(); - let (inner, write_buf) = inner.into_parts(); - - FramedParts { - io: inner.0, - codec: inner.1, - read_buf: read_buf, - write_buf: write_buf, - _priv: (), - } - } -} - -impl Stream for Framed -where - T: AsyncRead, - U: Decoder, -{ - type Item = U::Item; - type Error = U::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() - } -} - -impl Sink for Framed -where - T: AsyncWrite, - U: Encoder, - U::Error: From, -{ - type SinkItem = U::Item; - type SinkError = U::Error; - - fn start_send( - &mut self, item: Self::SinkItem, - ) -> StartSend { - self.inner.get_mut().start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.get_mut().poll_complete() - } - - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.get_mut().close() - } -} - -impl fmt::Debug for Framed -where - T: fmt::Debug, - U: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Framed") - .field("io", &self.inner.get_ref().get_ref().0) - .field("codec", &self.inner.get_ref().get_ref().1) - .finish() - } -} - -// ===== impl Fuse ===== - -impl Read for Fuse { - fn read(&mut self, dst: &mut [u8]) -> io::Result { - self.0.read(dst) - } -} - -impl AsyncRead for Fuse { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } -} - -impl Write for Fuse { - fn write(&mut self, src: &[u8]) -> io::Result { - self.0.write(src) - } - - fn flush(&mut self) -> io::Result<()> { - self.0.flush() - } -} - -impl AsyncWrite for Fuse { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.0.shutdown() - } -} - -impl Decoder for Fuse { - type Item = U::Item; - type Error = U::Error; - - fn decode( - &mut self, buffer: &mut BytesMut, - ) -> Result, Self::Error> { - self.1.decode(buffer) - } - - fn decode_eof( - &mut self, buffer: &mut BytesMut, - ) -> Result, Self::Error> { - self.1.decode_eof(buffer) - } -} - -impl Encoder for Fuse { - type Item = U::Item; - type Error = U::Error; - - fn encode( - &mut self, item: Self::Item, dst: &mut BytesMut, - ) -> Result<(), Self::Error> { - self.1.encode(item, dst) - } -} - -/// `FramedParts` contains an export of the data of a Framed transport. -/// It can be used to construct a new `Framed` with a different codec. -/// It contains all current buffers and the inner transport. -#[derive(Debug)] -pub struct FramedParts { - /// The inner transport used to read bytes to and write bytes to - pub io: T, - - /// The codec - pub codec: U, - - /// The buffer with read but unprocessed data. - pub read_buf: BytesMut, - - /// A buffer with unprocessed data which are not written yet. - pub write_buf: BytesMut, - - /// This private field allows us to add additional fields in the future in a - /// backwards compatible way. - _priv: (), -} - -impl FramedParts { - /// Create a new, default, `FramedParts` - pub fn new(io: T, codec: U) -> FramedParts { - FramedParts { - io, - codec, - read_buf: BytesMut::new(), - write_buf: BytesMut::new(), - _priv: (), - } - } -} diff --git a/src/framed/framed_read.rs b/src/framed/framed_read.rs deleted file mode 100644 index 065e2920..00000000 --- a/src/framed/framed_read.rs +++ /dev/null @@ -1,216 +0,0 @@ -use std::fmt; - -use bytes::BytesMut; -use futures::{Async, Poll, Sink, StartSend, Stream}; -use tokio_codec::Decoder; -use tokio_io::AsyncRead; - -use super::framed::Fuse; - -/// A `Stream` of messages decoded from an `AsyncRead`. -pub struct FramedRead { - inner: FramedRead2>, -} - -pub struct FramedRead2 { - inner: T, - eof: bool, - is_readable: bool, - buffer: BytesMut, -} - -const INITIAL_CAPACITY: usize = 8 * 1024; - -// ===== impl FramedRead ===== - -impl FramedRead -where - T: AsyncRead, - D: Decoder, -{ - /// Creates a new `FramedRead` with the given `decoder`. - pub fn new(inner: T, decoder: D) -> FramedRead { - FramedRead { - inner: framed_read2(Fuse(inner, decoder)), - } - } -} - -impl FramedRead { - /// Returns a reference to the underlying I/O stream wrapped by - /// `FramedRead`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_ref(&self) -> &T { - &self.inner.inner.0 - } - - /// Returns a mutable reference to the underlying I/O stream wrapped by - /// `FramedRead`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.0 - } - - /// Consumes the `FramedRead`, returning its underlying I/O stream. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn into_inner(self) -> T { - self.inner.inner.0 - } - - /// Returns a reference to the underlying decoder. - pub fn decoder(&self) -> &D { - &self.inner.inner.1 - } - - /// Returns a mutable reference to the underlying decoder. - pub fn decoder_mut(&mut self) -> &mut D { - &mut self.inner.inner.1 - } -} - -impl Stream for FramedRead -where - T: AsyncRead, - D: Decoder, -{ - type Item = D::Item; - type Error = D::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() - } -} - -impl Sink for FramedRead -where - T: Sink, -{ - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send( - &mut self, item: Self::SinkItem, - ) -> StartSend { - self.inner.inner.0.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.inner.0.poll_complete() - } - - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.inner.0.close() - } -} - -impl fmt::Debug for FramedRead -where - T: fmt::Debug, - D: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("FramedRead") - .field("inner", &self.inner.inner.0) - .field("decoder", &self.inner.inner.1) - .field("eof", &self.inner.eof) - .field("is_readable", &self.inner.is_readable) - .field("buffer", &self.inner.buffer) - .finish() - } -} - -// ===== impl FramedRead2 ===== - -pub fn framed_read2(inner: T) -> FramedRead2 { - FramedRead2 { - inner: inner, - eof: false, - is_readable: false, - buffer: BytesMut::with_capacity(INITIAL_CAPACITY), - } -} - -pub fn framed_read2_with_buffer(inner: T, mut buf: BytesMut) -> FramedRead2 { - if buf.capacity() < INITIAL_CAPACITY { - let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); - buf.reserve(bytes_to_reserve); - } - FramedRead2 { - inner: inner, - eof: false, - is_readable: buf.len() > 0, - buffer: buf, - } -} - -impl FramedRead2 { - pub fn get_ref(&self) -> &T { - &self.inner - } - - pub fn into_inner(self) -> T { - self.inner - } - - pub fn into_parts(self) -> (T, BytesMut) { - (self.inner, self.buffer) - } - - pub fn get_mut(&mut self) -> &mut T { - &mut self.inner - } -} - -impl Stream for FramedRead2 -where - T: AsyncRead + Decoder, -{ - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - loop { - // Repeatedly call `decode` or `decode_eof` as long as it is - // "readable". Readable is defined as not having returned `None`. If - // the upstream has returned EOF, and the decoder is no longer - // readable, it can be assumed that the decoder will never become - // readable again, at which point the stream is terminated. - if self.is_readable { - if self.eof { - let frame = try!(self.inner.decode_eof(&mut self.buffer)); - return Ok(Async::Ready(frame)); - } - - trace!("attempting to decode a frame"); - - if let Some(frame) = try!(self.inner.decode(&mut self.buffer)) { - trace!("frame decoded from buffer"); - return Ok(Async::Ready(Some(frame))); - } - - self.is_readable = false; - } - - assert!(!self.eof); - - // Otherwise, try to read more data and try again. Make sure we've - // got room for at least one byte to read to ensure that we don't - // get a spurious 0 that looks like EOF - self.buffer.reserve(1); - if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) { - self.eof = true; - } - - self.is_readable = true; - } - } -} diff --git a/src/framed/framed_write.rs b/src/framed/framed_write.rs deleted file mode 100644 index 310c7630..00000000 --- a/src/framed/framed_write.rs +++ /dev/null @@ -1,243 +0,0 @@ -use std::fmt; -use std::io::{self, Read}; - -use bytes::BytesMut; -use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; -use tokio_codec::{Decoder, Encoder}; -use tokio_io::{AsyncRead, AsyncWrite}; - -use super::framed::Fuse; - -/// A `Sink` of frames encoded to an `AsyncWrite`. -pub struct FramedWrite { - inner: FramedWrite2>, -} - -pub struct FramedWrite2 { - inner: T, - buffer: BytesMut, -} - -const INITIAL_CAPACITY: usize = 8 * 1024; -const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; - -impl FramedWrite -where - T: AsyncWrite, - E: Encoder, -{ - /// Creates a new `FramedWrite` with the given `encoder`. - pub fn new(inner: T, encoder: E) -> FramedWrite { - FramedWrite { - inner: framed_write2(Fuse(inner, encoder)), - } - } -} - -impl FramedWrite { - /// Returns a reference to the underlying I/O stream wrapped by - /// `FramedWrite`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_ref(&self) -> &T { - &self.inner.inner.0 - } - - /// Returns a mutable reference to the underlying I/O stream wrapped by - /// `FramedWrite`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.0 - } - - /// Consumes the `FramedWrite`, returning its underlying I/O stream. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn into_inner(self) -> T { - self.inner.inner.0 - } - - /// Returns a reference to the underlying decoder. - pub fn encoder(&self) -> &E { - &self.inner.inner.1 - } - - /// Returns a mutable reference to the underlying decoder. - pub fn encoder_mut(&mut self) -> &mut E { - &mut self.inner.inner.1 - } -} - -impl Sink for FramedWrite -where - T: AsyncWrite, - E: Encoder, -{ - type SinkItem = E::Item; - type SinkError = E::Error; - - fn start_send(&mut self, item: E::Item) -> StartSend { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.poll_complete() - } - - fn close(&mut self) -> Poll<(), Self::SinkError> { - Ok(try!(self.inner.close())) - } -} - -impl Stream for FramedWrite -where - T: Stream, -{ - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.inner.0.poll() - } -} - -impl fmt::Debug for FramedWrite -where - T: fmt::Debug, - U: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("FramedWrite") - .field("inner", &self.inner.get_ref().0) - .field("encoder", &self.inner.get_ref().1) - .field("buffer", &self.inner.buffer) - .finish() - } -} - -// ===== impl FramedWrite2 ===== - -pub fn framed_write2(inner: T) -> FramedWrite2 { - FramedWrite2 { - inner: inner, - buffer: BytesMut::with_capacity(INITIAL_CAPACITY), - } -} - -pub fn framed_write2_with_buffer(inner: T, mut buf: BytesMut) -> FramedWrite2 { - if buf.capacity() < INITIAL_CAPACITY { - let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); - buf.reserve(bytes_to_reserve); - } - FramedWrite2 { - inner: inner, - buffer: buf, - } -} - -impl FramedWrite2 { - pub fn get_ref(&self) -> &T { - &self.inner - } - - pub fn into_inner(self) -> T { - self.inner - } - - pub fn into_parts(self) -> (T, BytesMut) { - (self.inner, self.buffer) - } - - pub fn get_mut(&mut self) -> &mut T { - &mut self.inner - } -} - -impl Sink for FramedWrite2 -where - T: AsyncWrite + Encoder, -{ - type SinkItem = T::Item; - type SinkError = T::Error; - - fn start_send(&mut self, item: T::Item) -> StartSend { - // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's - // *still* over 8KiB, then apply backpressure (reject the send). - if self.buffer.len() >= BACKPRESSURE_BOUNDARY { - try!(self.poll_complete()); - - if self.buffer.len() >= BACKPRESSURE_BOUNDARY { - return Ok(AsyncSink::NotReady(item)); - } - } - - try!(self.inner.encode(item, &mut self.buffer)); - - Ok(AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - trace!("flushing framed transport"); - - while !self.buffer.is_empty() { - trace!("writing; remaining={}", self.buffer.len()); - - let n = try_ready!(self.inner.poll_write(&self.buffer)); - - if n == 0 { - return Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to \ - write frame to transport", - ).into()); - } - - // TODO: Add a way to `bytes` to do this w/o returning the drained - // data. - let _ = self.buffer.split_to(n); - } - - // Try flushing the underlying IO - try_ready!(self.inner.poll_flush()); - - trace!("framed transport flushed"); - return Ok(Async::Ready(())); - } - - fn close(&mut self) -> Poll<(), Self::SinkError> { - try_ready!(self.poll_complete()); - Ok(try!(self.inner.shutdown())) - } -} - -impl Decoder for FramedWrite2 { - type Item = T::Item; - type Error = T::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, T::Error> { - self.inner.decode(src) - } - - fn decode_eof(&mut self, src: &mut BytesMut) -> Result, T::Error> { - self.inner.decode_eof(src) - } -} - -impl Read for FramedWrite2 { - fn read(&mut self, dst: &mut [u8]) -> io::Result { - self.inner.read(dst) - } -} - -impl AsyncRead for FramedWrite2 { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } -} diff --git a/src/framed/mod.rs b/src/framed/mod.rs deleted file mode 100644 index cb0308fa..00000000 --- a/src/framed/mod.rs +++ /dev/null @@ -1,32 +0,0 @@ -//! Utilities for encoding and decoding frames. -//! -//! Contains adapters to go from streams of bytes, [`AsyncRead`] and -//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. -//! Framed streams are also known as [transports]. -//! -//! [`AsyncRead`]: # -//! [`AsyncWrite`]: # -//! [`Sink`]: # -//! [`Stream`]: # -//! [transports]: # - -#![deny(missing_docs, missing_debug_implementations, warnings)] -#![doc(hidden, html_root_url = "https://docs.rs/tokio-codec/0.1.0")] - -// _tokio_codec are the items that belong in the `tokio_codec` crate. However, because we need to -// maintain backward compatibility until the next major breaking change, they are defined here. -// When the next breaking change comes, they should be moved to the `tokio_codec` crate and become -// independent. -// -// The primary reason we can't move these to `tokio-codec` now is because, again for backward -// compatibility reasons, we need to keep `Decoder` and `Encoder` in tokio_io::codec. And `Decoder` -// and `Encoder` needs to reference `Framed`. So they all still need to still be in the same -// module. - -mod framed; -mod framed_read; -mod framed_write; - -pub use self::framed::{Framed, FramedParts}; -pub use self::framed_read::FramedRead; -pub use self::framed_write::FramedWrite; diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index 728a78b9..f2001302 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -1,12 +1,11 @@ -// #![allow(unused_imports, unused_variables, dead_code)] use std::collections::VecDeque; use std::fmt::{Debug, Display}; use std::time::Instant; +use actix_net::codec::Framed; use actix_net::service::Service; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; -// use tokio_current_thread::spawn; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; @@ -16,7 +15,6 @@ use payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter}; use body::Body; use config::ServiceConfig; use error::DispatchError; -use framed::Framed; use request::Request; use response::Response; diff --git a/src/lib.rs b/src/lib.rs index 74e7ced7..9acdf3cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -135,9 +135,6 @@ mod request; mod response; mod uri; -#[doc(hidden)] -pub mod framed; - pub mod error; pub mod h1; pub(crate) mod helpers; diff --git a/src/ws/mod.rs b/src/ws/mod.rs index e8bf3870..bd657d94 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -15,10 +15,12 @@ mod codec; mod frame; mod mask; mod proto; +mod transport; -pub use self::codec::Message; +pub use self::codec::{Codec, Message}; pub use self::frame::Frame; pub use self::proto::{CloseCode, CloseReason, OpCode}; +pub use self::transport::Transport; /// Websocket protocol errors #[derive(Fail, Debug)] diff --git a/src/ws/transport.rs b/src/ws/transport.rs new file mode 100644 index 00000000..aabeb5d5 --- /dev/null +++ b/src/ws/transport.rs @@ -0,0 +1,50 @@ +use actix_net::codec::Framed; +use actix_net::framed::{FramedTransport, FramedTransportError}; +use actix_net::service::{IntoService, Service}; +use futures::{Future, Poll}; +use tokio_io::{AsyncRead, AsyncWrite}; + +use super::{Codec, Message}; + +pub struct Transport +where + S: Service, + T: AsyncRead + AsyncWrite, +{ + inner: FramedTransport, +} + +impl Transport +where + T: AsyncRead + AsyncWrite, + S: Service, + S::Future: 'static, + S::Error: 'static, +{ + pub fn new>(io: T, service: F) -> Self { + Transport { + inner: FramedTransport::new(Framed::new(io, Codec::new()), service), + } + } + + pub fn with>(framed: Framed, service: F) -> Self { + Transport { + inner: FramedTransport::new(framed, service), + } + } +} + +impl Future for Transport +where + T: AsyncRead + AsyncWrite, + S: Service, + S::Future: 'static, + S::Error: 'static, +{ + type Item = (); + type Error = FramedTransportError; + + fn poll(&mut self) -> Poll { + self.inner.poll() + } +} diff --git a/tests/test_ws.rs b/tests/test_ws.rs new file mode 100644 index 00000000..86a30919 --- /dev/null +++ b/tests/test_ws.rs @@ -0,0 +1,111 @@ +extern crate actix; +extern crate actix_http; +extern crate actix_net; +extern crate actix_web; +extern crate bytes; +extern crate futures; + +use std::{io, thread}; + +use actix::System; +use actix_net::codec::Framed; +use actix_net::server::Server; +use actix_net::service::IntoNewService; +use actix_web::{test, ws as web_ws}; +use bytes::Bytes; +use futures::future::{ok, Either}; +use futures::{Future, Sink, Stream}; + +use actix_http::{h1, ws, ResponseError}; + +fn ws_handler(req: ws::Message) -> impl Future { + match req { + ws::Message::Ping(msg) => ok(ws::Message::Pong(msg)), + ws::Message::Text(text) => ok(ws::Message::Text(text)), + ws::Message::Binary(bin) => ok(ws::Message::Binary(bin)), + ws::Message::Close(reason) => ok(ws::Message::Close(reason)), + _ => ok(ws::Message::Close(None)), + } +} + +#[test] +fn test_simple() { + let addr = test::TestServer::unused_addr(); + thread::spawn(move || { + Server::new() + .bind("test", addr, move || { + (|io| { + // read http request + let framed = Framed::new(io, h1::Codec::new(false)); + framed + .into_future() + .map_err(|_| ()) + .and_then(|(req, framed)| { + // validate request + if let Some(h1::InMessage::MessageWithPayload(req)) = req { + match ws::handshake(&req) { + Err(e) => { + // validation failed + let resp = e.error_response(); + Either::A( + framed + .send(h1::OutMessage::Response(resp)) + .map_err(|_| ()) + .map(|_| ()), + ) + } + Ok(mut resp) => Either::B( + // send response + framed + .send(h1::OutMessage::Response( + resp.finish(), + )).map_err(|_| ()) + .and_then(|framed| { + // start websocket service + let framed = + framed.into_framed(ws::Codec::new()); + ws::Transport::with(framed, ws_handler) + .map_err(|_| ()) + }), + ), + } + } else { + panic!() + } + }) + }).into_new_service() + }).unwrap() + .run(); + }); + + let mut sys = System::new("test"); + { + let (reader, mut writer) = sys + .block_on(web_ws::Client::new(format!("http://{}/", addr)).connect()) + .unwrap(); + + writer.text("text"); + let (item, reader) = sys.block_on(reader.into_future()).unwrap(); + assert_eq!(item, Some(web_ws::Message::Text("text".to_owned()))); + + writer.binary(b"text".as_ref()); + let (item, reader) = sys.block_on(reader.into_future()).unwrap(); + assert_eq!( + item, + Some(web_ws::Message::Binary(Bytes::from_static(b"text").into())) + ); + + writer.ping("ping"); + let (item, reader) = sys.block_on(reader.into_future()).unwrap(); + assert_eq!(item, Some(web_ws::Message::Pong("ping".to_owned()))); + + writer.close(Some(web_ws::CloseCode::Normal.into())); + let (item, _) = sys.block_on(reader.into_future()).unwrap(); + assert_eq!( + item, + Some(web_ws::Message::Close(Some( + web_ws::CloseCode::Normal.into() + ))) + ); + } +}