1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-01-18 05:41:50 +01:00

change ws client names

This commit is contained in:
Nikolay Kim 2018-01-29 15:45:37 -08:00
parent 6e51573975
commit 5cc3bba5cc
4 changed files with 46 additions and 43 deletions

View File

@ -13,7 +13,7 @@ use std::time::Duration;
use actix::*;
use futures::Future;
use tokio_core::net::TcpStream;
use actix_web::ws::{client, Message, WsClientError};
use actix_web::ws::{Message, WsClientError, WsClient, WsClientWriter};
fn main() {
@ -22,7 +22,7 @@ fn main() {
let sys = actix::System::new("ws-example");
Arbiter::handle().spawn(
client::WsClient::new("http://127.0.0.1:8080/ws/")
WsClient::new("http://127.0.0.1:8080/ws/")
.connect().unwrap()
.map_err(|e| {
println!("Error: {}", e);
@ -54,7 +54,7 @@ fn main() {
}
struct ChatClient(client::WsWriter<TcpStream>);
struct ChatClient(WsClientWriter<TcpStream>);
#[derive(Message)]
struct ClientCommand(String);

View File

@ -43,32 +43,6 @@ pub enum HttpResponseParserError {
impl HttpResponseParser {
fn decode(&mut self, buf: &mut BytesMut) -> Result<Decoding, HttpResponseParserError> {
if let Some(ref mut payload) = self.payload {
if payload.tx.capacity() > DEFAULT_BUFFER_SIZE {
return Ok(Decoding::Paused)
}
loop {
match payload.decoder.decode(buf) {
Ok(Async::Ready(Some(bytes))) => {
payload.tx.feed_data(bytes)
},
Ok(Async::Ready(None)) => {
payload.tx.feed_eof();
return Ok(Decoding::Ready)
},
Ok(Async::NotReady) => return Ok(Decoding::NotReady),
Err(err) => {
payload.tx.set_error(err.into());
return Err(HttpResponseParserError::Payload)
}
}
}
} else {
return Ok(Decoding::Ready)
}
}
pub fn parse<T>(&mut self, io: &mut T, buf: &mut BytesMut)
-> Poll<ClientResponse, HttpResponseParserError>
where T: IoStream
@ -154,7 +128,34 @@ impl HttpResponseParser {
}
}
fn parse_message(buf: &mut BytesMut) -> Poll<(ClientResponse, Option<PayloadInfo>), ParseError>
fn decode(&mut self, buf: &mut BytesMut) -> Result<Decoding, HttpResponseParserError> {
if let Some(ref mut payload) = self.payload {
if payload.tx.capacity() > DEFAULT_BUFFER_SIZE {
return Ok(Decoding::Paused)
}
loop {
match payload.decoder.decode(buf) {
Ok(Async::Ready(Some(bytes))) => {
payload.tx.feed_data(bytes)
},
Ok(Async::Ready(None)) => {
payload.tx.feed_eof();
return Ok(Decoding::Ready)
},
Ok(Async::NotReady) => return Ok(Decoding::NotReady),
Err(err) => {
payload.tx.set_error(err.into());
return Err(HttpResponseParserError::Payload)
}
}
}
} else {
return Ok(Decoding::Ready)
}
}
fn parse_message(buf: &mut BytesMut)
-> Poll<(ClientResponse, Option<PayloadInfo>), ParseError>
{
// Parse http message
let bytes_ptr = buf.as_ref().as_ptr() as usize;

View File

@ -28,6 +28,10 @@ use super::proto::{CloseCode, OpCode};
use super::frame::Frame;
use super::connect::{TcpConnector, TcpConnectorError};
pub type WsClientFuture<T> =
Future<Item=(WsClientReader<T>, WsClientWriter<T>), Error=WsClientError>;
/// Websockt client error
#[derive(Fail, Debug)]
pub enum WsClientError {
@ -85,8 +89,6 @@ impl From<HttpResponseParserError> for WsClientError {
}
}
pub type WsFuture<T> = Future<Item=(WsReader<T>, WsWriter<T>), Error=WsClientError>;
/// Websockt client
pub struct WsClient {
request: ClientRequestBuilder,
@ -143,7 +145,7 @@ impl WsClient {
self
}
pub fn connect(&mut self) -> Result<Box<WsFuture<TcpStream>>, WsClientError> {
pub fn connect(&mut self) -> Result<Box<WsClientFuture<TcpStream>>, WsClientError> {
if let Some(e) = self.err.take() {
return Err(e)
}
@ -234,7 +236,7 @@ impl<T: IoStream> WsHandshake<T> {
}
impl<T: IoStream> Future for WsHandshake<T> {
type Item = (WsReader<T>, WsWriter<T>);
type Item = (WsClientReader<T>, WsClientWriter<T>);
type Error = WsClientError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -296,8 +298,8 @@ impl<T: IoStream> Future for WsHandshake<T> {
let inner = Rc::new(UnsafeCell::new(Inner{inner: inner}));
Ok(Async::Ready(
(WsReader{inner: Rc::clone(&inner)},
WsWriter{inner: inner})))
(WsClientReader{inner: Rc::clone(&inner)},
WsClientWriter{inner: inner})))
},
Ok(Async::NotReady) => {
self.inner = Some(inner);
@ -313,18 +315,18 @@ struct Inner<T> {
inner: WsInner<T>,
}
pub struct WsReader<T> {
pub struct WsClientReader<T> {
inner: Rc<UnsafeCell<Inner<T>>>
}
impl<T> WsReader<T> {
impl<T> WsClientReader<T> {
#[inline]
fn as_mut(&mut self) -> &mut Inner<T> {
unsafe{ &mut *self.inner.get() }
}
}
impl<T: IoStream> Stream for WsReader<T> {
impl<T: IoStream> Stream for WsClientReader<T> {
type Item = Message;
type Error = WsClientError;
@ -404,18 +406,18 @@ impl<T: IoStream> Stream for WsReader<T> {
}
}
pub struct WsWriter<T> {
pub struct WsClientWriter<T> {
inner: Rc<UnsafeCell<Inner<T>>>
}
impl<T: IoStream> WsWriter<T> {
impl<T: IoStream> WsClientWriter<T> {
#[inline]
fn as_mut(&mut self) -> &mut Inner<T> {
unsafe{ &mut *self.inner.get() }
}
}
impl<T: IoStream> WsWriter<T> {
impl<T: IoStream> WsClientWriter<T> {
/// Write payload
#[inline]

View File

@ -67,7 +67,7 @@ use self::frame::Frame;
use self::proto::{hash_key, OpCode};
pub use self::proto::CloseCode;
pub use self::context::WebsocketContext;
pub use self::client::{WsClient, WsClientError, WsReader, WsWriter, WsFuture};
pub use self::client::{WsClient, WsClientError, WsClientReader, WsClientWriter, WsClientFuture};
const SEC_WEBSOCKET_ACCEPT: &str = "SEC-WEBSOCKET-ACCEPT";
const SEC_WEBSOCKET_KEY: &str = "SEC-WEBSOCKET-KEY";