1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-06-25 06:39:22 +02:00

add client websockets support

This commit is contained in:
Nikolay Kim
2019-03-27 18:53:19 -07:00
parent e254fe4f9c
commit c59937784e
19 changed files with 709 additions and 536 deletions

View File

@ -1,11 +1,13 @@
use std::{fmt, time};
use std::{fmt, io, time};
use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes;
use futures::Future;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use bytes::{Buf, Bytes};
use futures::future::{err, Either, Future, FutureResult};
use futures::Poll;
use h2::client::SendRequest;
use crate::body::MessageBody;
use crate::h1::ClientCodec;
use crate::message::{RequestHead, ResponseHead};
use crate::payload::Payload;
@ -19,6 +21,7 @@ pub(crate) enum ConnectionType<Io> {
}
pub trait Connection {
type Io: AsyncRead + AsyncWrite;
type Future: Future<Item = (ResponseHead, Payload), Error = SendRequestError>;
/// Send request and body
@ -27,6 +30,14 @@ pub trait Connection {
head: RequestHead,
body: B,
) -> Self::Future;
type TunnelFuture: Future<
Item = (ResponseHead, Framed<Self::Io, ClientCodec>),
Error = SendRequestError,
>;
/// Send request, returns Response and Framed
fn open_tunnel(self, head: RequestHead) -> Self::TunnelFuture;
}
pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
@ -80,6 +91,7 @@ impl<T> Connection for IoConnection<T>
where
T: AsyncRead + AsyncWrite + 'static,
{
type Io = T;
type Future = Box<Future<Item = (ResponseHead, Payload), Error = SendRequestError>>;
fn send_request<B: MessageBody + 'static>(
@ -104,6 +116,35 @@ where
)),
}
}
type TunnelFuture = Either<
Box<
Future<
Item = (ResponseHead, Framed<Self::Io, ClientCodec>),
Error = SendRequestError,
>,
>,
FutureResult<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
>;
/// Send request, returns Response and Framed
fn open_tunnel(mut self, head: RequestHead) -> Self::TunnelFuture {
match self.io.take().unwrap() {
ConnectionType::H1(io) => {
Either::A(Box::new(h1proto::open_tunnel(io, head)))
}
ConnectionType::H2(io) => {
if let Some(mut pool) = self.pool.take() {
pool.release(IoConnection::new(
ConnectionType::H2(io),
self.created,
None,
));
}
Either::B(err(SendRequestError::TunnelNotSupported))
}
}
}
}
#[allow(dead_code)]
@ -117,6 +158,7 @@ where
A: AsyncRead + AsyncWrite + 'static,
B: AsyncRead + AsyncWrite + 'static,
{
type Io = EitherIo<A, B>;
type Future = Box<Future<Item = (ResponseHead, Payload), Error = SendRequestError>>;
fn send_request<RB: MessageBody + 'static>(
@ -129,4 +171,99 @@ where
EitherConnection::B(con) => con.send_request(head, body),
}
}
type TunnelFuture = Box<
Future<
Item = (ResponseHead, Framed<Self::Io, ClientCodec>),
Error = SendRequestError,
>,
>;
/// Send request, returns Response and Framed
fn open_tunnel(self, head: RequestHead) -> Self::TunnelFuture {
match self {
EitherConnection::A(con) => Box::new(
con.open_tunnel(head)
.map(|(head, framed)| (head, framed.map_io(|io| EitherIo::A(io)))),
),
EitherConnection::B(con) => Box::new(
con.open_tunnel(head)
.map(|(head, framed)| (head, framed.map_io(|io| EitherIo::B(io)))),
),
}
}
}
pub enum EitherIo<A, B> {
A(A),
B(B),
}
impl<A, B> io::Read for EitherIo<A, B>
where
A: io::Read,
B: io::Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
EitherIo::A(ref mut val) => val.read(buf),
EitherIo::B(ref mut val) => val.read(buf),
}
}
}
impl<A, B> AsyncRead for EitherIo<A, B>
where
A: AsyncRead,
B: AsyncRead,
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match self {
EitherIo::A(ref val) => val.prepare_uninitialized_buffer(buf),
EitherIo::B(ref val) => val.prepare_uninitialized_buffer(buf),
}
}
}
impl<A, B> io::Write for EitherIo<A, B>
where
A: io::Write,
B: io::Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
EitherIo::A(ref mut val) => val.write(buf),
EitherIo::B(ref mut val) => val.write(buf),
}
}
fn flush(&mut self) -> io::Result<()> {
match self {
EitherIo::A(ref mut val) => val.flush(),
EitherIo::B(ref mut val) => val.flush(),
}
}
}
impl<A, B> AsyncWrite for EitherIo<A, B>
where
A: AsyncWrite,
B: AsyncWrite,
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
match self {
EitherIo::A(ref mut val) => val.shutdown(),
EitherIo::B(ref mut val) => val.shutdown(),
}
}
fn write_buf<U: Buf>(&mut self, buf: &mut U) -> Poll<usize, io::Error>
where
Self: Sized,
{
match self {
EitherIo::A(ref mut val) => val.write_buf(buf),
EitherIo::B(ref mut val) => val.write_buf(buf),
}
}
}

View File

@ -105,6 +105,9 @@ pub enum SendRequestError {
/// Http2 error
#[display(fmt = "{}", _0)]
H2(h2::Error),
/// Tunnels are not supported for http2 connection
#[display(fmt = "Tunnels are not supported for http2 connection")]
TunnelNotSupported,
/// Error sending request body
Body(Error),
}

View File

@ -70,6 +70,32 @@ where
})
}
pub(crate) fn open_tunnel<T>(
io: T,
head: RequestHead,
) -> impl Future<Item = (ResponseHead, Framed<T, h1::ClientCodec>), Error = SendRequestError>
where
T: AsyncRead + AsyncWrite + 'static,
{
// create Framed and send reqest
Framed::new(io, h1::ClientCodec::default())
.send((head, BodySize::None).into())
.from_err()
// read response
.and_then(|framed| {
framed
.into_future()
.map_err(|(e, _)| SendRequestError::from(e))
.and_then(|(head, framed)| {
if let Some(head) = head {
Ok((head, framed))
} else {
Err(SendRequestError::from(ConnectError::Disconnected))
}
})
})
}
#[doc(hidden)]
/// HTTP client connection
pub struct H1Connection<T> {

View File

@ -411,66 +411,6 @@ where
}
}
// struct ConnectorPoolSupport<T, Io>
// where
// Io: AsyncRead + AsyncWrite + 'static,
// {
// connector: T,
// inner: Rc<RefCell<Inner<Io>>>,
// }
// impl<T, Io> Future for ConnectorPoolSupport<T, Io>
// where
// Io: AsyncRead + AsyncWrite + 'static,
// T: Service<Connect, Response = (Io, Protocol), Error = ConnectorError>,
// T::Future: 'static,
// {
// type Item = ();
// type Error = ();
// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// let mut inner = self.inner.as_ref().borrow_mut();
// inner.task.register();
// // check waiters
// loop {
// let (key, token) = {
// if let Some((key, token)) = inner.waiters_queue.get_index(0) {
// (key.clone(), *token)
// } else {
// break;
// }
// };
// match inner.acquire(&key) {
// Acquire::NotAvailable => break,
// Acquire::Acquired(io, created) => {
// let (_, tx) = inner.waiters.remove(token);
// if let Err(conn) = tx.send(Ok(IoConnection::new(
// io,
// created,
// Some(Acquired(key.clone(), Some(self.inner.clone()))),
// ))) {
// let (io, created) = conn.unwrap().into_inner();
// inner.release_conn(&key, io, created);
// }
// }
// Acquire::Available => {
// let (connect, tx) = inner.waiters.remove(token);
// OpenWaitingConnection::spawn(
// key.clone(),
// tx,
// self.inner.clone(),
// self.connector.call(connect),
// );
// }
// }
// let _ = inner.waiters_queue.swap_remove_index(0);
// }
// Ok(Async::NotReady)
// }
// }
struct CloseConnection<T> {
io: T,
timeout: Delay,

View File

@ -611,7 +611,6 @@ mod tests {
use super::*;
use crate::error::ParseError;
use crate::httpmessage::HttpMessage;
use crate::message::Head;
impl PayloadType {
fn unwrap(self) -> PayloadDecoder {

View File

@ -1,109 +0,0 @@
//! Http client request
use std::str;
#[cfg(feature = "cookies")]
use cookie::Cookie;
use http::header::{HeaderName, HeaderValue};
use http::{Error as HttpError, HttpTryFrom, Uri};
use super::ClientError;
use crate::header::IntoHeaderValue;
use crate::message::RequestHead;
/// `WebSocket` connection
pub struct Connect {
pub(super) head: RequestHead,
pub(super) err: Option<ClientError>,
pub(super) http_err: Option<HttpError>,
pub(super) origin: Option<HeaderValue>,
pub(super) protocols: Option<String>,
pub(super) max_size: usize,
pub(super) server_mode: bool,
}
impl Connect {
/// Create new websocket connection
pub fn new<S: AsRef<str>>(uri: S) -> Connect {
let mut cl = Connect {
head: RequestHead::default(),
err: None,
http_err: None,
origin: None,
protocols: None,
max_size: 65_536,
server_mode: false,
};
match Uri::try_from(uri.as_ref()) {
Ok(uri) => cl.head.uri = uri,
Err(e) => cl.http_err = Some(e.into()),
}
cl
}
/// Set supported websocket protocols
pub fn protocols<U, V>(mut self, protos: U) -> Self
where
U: IntoIterator<Item = V> + 'static,
V: AsRef<str>,
{
let mut protos = protos
.into_iter()
.fold(String::new(), |acc, s| acc + s.as_ref() + ",");
protos.pop();
self.protocols = Some(protos);
self
}
// #[cfg(feature = "cookies")]
// /// Set cookie for handshake request
// pub fn cookie(mut self, cookie: Cookie) -> Self {
// self.request.cookie(cookie);
// self
// }
/// Set request Origin
pub fn origin<V>(mut self, origin: V) -> Self
where
HeaderValue: HttpTryFrom<V>,
{
match HeaderValue::try_from(origin) {
Ok(value) => self.origin = Some(value),
Err(e) => self.http_err = Some(e.into()),
}
self
}
/// Set max frame size
///
/// By default max size is set to 64kb
pub fn max_frame_size(mut self, size: usize) -> Self {
self.max_size = size;
self
}
/// Disable payload masking. By default ws client masks frame payload.
pub fn server_mode(mut self) -> Self {
self.server_mode = true;
self
}
/// Set request header
pub fn header<K, V>(mut self, key: K, value: V) -> Self
where
HeaderName: HttpTryFrom<K>,
V: IntoHeaderValue,
{
match HeaderName::try_from(key) {
Ok(key) => match value.try_into() {
Ok(value) => {
self.head.headers.append(key, value);
}
Err(e) => self.http_err = Some(e.into()),
},
Err(e) => self.http_err = Some(e.into()),
}
self
}
}

View File

@ -1,53 +0,0 @@
//! Http client request
use std::io;
use actix_connect::ConnectError;
use derive_more::{Display, From};
use http::{header::HeaderValue, Error as HttpError, StatusCode};
use crate::error::ParseError;
use crate::ws::ProtocolError;
/// Websocket client error
#[derive(Debug, Display, From)]
pub enum ClientError {
/// Invalid url
#[display(fmt = "Invalid url")]
InvalidUrl,
/// Invalid response status
#[display(fmt = "Invalid response status")]
InvalidResponseStatus(StatusCode),
/// Invalid upgrade header
#[display(fmt = "Invalid upgrade header")]
InvalidUpgradeHeader,
/// Invalid connection header
#[display(fmt = "Invalid connection header")]
InvalidConnectionHeader(HeaderValue),
/// Missing CONNECTION header
#[display(fmt = "Missing CONNECTION header")]
MissingConnectionHeader,
/// Missing SEC-WEBSOCKET-ACCEPT header
#[display(fmt = "Missing SEC-WEBSOCKET-ACCEPT header")]
MissingWebSocketAcceptHeader,
/// Invalid challenge response
#[display(fmt = "Invalid challenge response")]
InvalidChallengeResponse(String, HeaderValue),
/// Http parsing error
#[display(fmt = "Http parsing error")]
Http(HttpError),
/// Response parsing error
#[display(fmt = "Response parsing error: {}", _0)]
ParseError(ParseError),
/// Protocol error
#[display(fmt = "{}", _0)]
Protocol(ProtocolError),
/// Connect error
#[display(fmt = "Connector error: {:?}", _0)]
Connect(ConnectError),
/// IO Error
#[display(fmt = "{}", _0)]
Io(io::Error),
/// "Disconnected"
#[display(fmt = "Disconnected")]
Disconnected,
}

View File

@ -1,48 +0,0 @@
mod connect;
mod error;
mod service;
pub use self::connect::Connect;
pub use self::error::ClientError;
pub use self::service::Client;
#[derive(PartialEq, Hash, Debug, Clone, Copy)]
pub(crate) enum Protocol {
Http,
Https,
Ws,
Wss,
}
impl Protocol {
fn from(s: &str) -> Option<Protocol> {
match s {
"http" => Some(Protocol::Http),
"https" => Some(Protocol::Https),
"ws" => Some(Protocol::Ws),
"wss" => Some(Protocol::Wss),
_ => None,
}
}
// fn is_http(self) -> bool {
// match self {
// Protocol::Https | Protocol::Http => true,
// _ => false,
// }
// }
// fn is_secure(self) -> bool {
// match self {
// Protocol::Https | Protocol::Wss => true,
// _ => false,
// }
// }
fn port(self) -> u16 {
match self {
Protocol::Http | Protocol::Ws => 80,
Protocol::Https | Protocol::Wss => 443,
}
}
}

View File

@ -1,272 +0,0 @@
//! websockets client
use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_connect::{default_connector, Connect as TcpConnect, ConnectError};
use actix_service::{apply_fn, Service};
use base64;
use futures::future::{err, Either, FutureResult};
use futures::{try_ready, Async, Future, Poll, Sink, Stream};
use http::header::{self, HeaderValue};
use http::{HttpTryFrom, StatusCode};
use log::trace;
use rand;
use sha1::Sha1;
use crate::body::BodySize;
use crate::h1;
use crate::message::{ConnectionType, ResponseHead};
use crate::ws::Codec;
use super::{ClientError, Connect, Protocol};
/// WebSocket's client
pub struct Client<T> {
connector: T,
}
impl Client<()> {
/// Create client with default connector.
pub fn default() -> Client<
impl Service<
Request = TcpConnect<String>,
Response = impl AsyncRead + AsyncWrite,
Error = ConnectError,
> + Clone,
> {
Client::new(apply_fn(default_connector(), |msg: TcpConnect<_>, srv| {
srv.call(msg).map(|stream| stream.into_parts().0)
}))
}
}
impl<T> Client<T>
where
T: Service<Request = TcpConnect<String>, Error = ConnectError>,
T::Response: AsyncRead + AsyncWrite,
{
/// Create new websocket's client factory
pub fn new(connector: T) -> Self {
Client { connector }
}
}
impl<T> Clone for Client<T>
where
T: Service<Request = TcpConnect<String>, Error = ConnectError> + Clone,
T::Response: AsyncRead + AsyncWrite,
{
fn clone(&self) -> Self {
Client {
connector: self.connector.clone(),
}
}
}
impl<T> Service for Client<T>
where
T: Service<Request = TcpConnect<String>, Error = ConnectError>,
T::Response: AsyncRead + AsyncWrite + 'static,
T::Future: 'static,
{
type Request = Connect;
type Response = Framed<T::Response, Codec>;
type Error = ClientError;
type Future = Either<
FutureResult<Self::Response, Self::Error>,
ClientResponseFut<T::Response>,
>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.connector.poll_ready().map_err(ClientError::from)
}
fn call(&mut self, mut req: Connect) -> Self::Future {
if let Some(e) = req.err.take() {
Either::A(err(e))
} else if let Some(e) = req.http_err.take() {
Either::A(err(e.into()))
} else {
// origin
if let Some(origin) = req.origin.take() {
req.head.headers.insert(header::ORIGIN, origin);
}
req.head.set_connection_type(ConnectionType::Upgrade);
req.head
.headers
.insert(header::UPGRADE, HeaderValue::from_static("websocket"));
req.head.headers.insert(
header::SEC_WEBSOCKET_VERSION,
HeaderValue::from_static("13"),
);
if let Some(protocols) = req.protocols.take() {
req.head.headers.insert(
header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::try_from(protocols.as_str()).unwrap(),
);
}
if let Some(e) = req.http_err {
return Either::A(err(e.into()));
};
let mut request = req.head;
if request.uri.host().is_none() {
return Either::A(err(ClientError::InvalidUrl));
}
// supported protocols
let proto = if let Some(scheme) = request.uri.scheme_part() {
match Protocol::from(scheme.as_str()) {
Some(proto) => proto,
None => return Either::A(err(ClientError::InvalidUrl)),
}
} else {
return Either::A(err(ClientError::InvalidUrl));
};
// Generate a random key for the `Sec-WebSocket-Key` header.
// a base64-encoded (see Section 4 of [RFC4648]) value that,
// when decoded, is 16 bytes in length (RFC 6455)
let sec_key: [u8; 16] = rand::random();
let key = base64::encode(&sec_key);
request.headers.insert(
header::SEC_WEBSOCKET_KEY,
HeaderValue::try_from(key.as_str()).unwrap(),
);
// prep connection
let connect = TcpConnect::new(request.uri.host().unwrap().to_string())
.set_port(request.uri.port_u16().unwrap_or_else(|| proto.port()));
let fut = Box::new(
self.connector
.call(connect)
.map_err(ClientError::from)
.and_then(move |io| {
// h1 protocol
let framed = Framed::new(io, h1::ClientCodec::default());
framed
.send((request, BodySize::None).into())
.map_err(ClientError::from)
.and_then(|framed| {
framed
.into_future()
.map_err(|(e, _)| ClientError::from(e))
})
}),
);
// start handshake
Either::B(ClientResponseFut {
key,
fut,
max_size: req.max_size,
server_mode: req.server_mode,
_t: PhantomData,
})
}
}
}
/// Future that implementes client websocket handshake process.
///
/// It resolves to a `Framed<T, ws::Codec>` instance.
pub struct ClientResponseFut<T>
where
T: AsyncRead + AsyncWrite,
{
fut: Box<
Future<
Item = (Option<ResponseHead>, Framed<T, h1::ClientCodec>),
Error = ClientError,
>,
>,
key: String,
max_size: usize,
server_mode: bool,
_t: PhantomData<T>,
}
impl<T> Future for ClientResponseFut<T>
where
T: AsyncRead + AsyncWrite,
{
type Item = Framed<T, Codec>;
type Error = ClientError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (item, framed) = try_ready!(self.fut.poll());
let res = match item {
Some(res) => res,
None => return Err(ClientError::Disconnected),
};
// verify response
if res.status != StatusCode::SWITCHING_PROTOCOLS {
return Err(ClientError::InvalidResponseStatus(res.status));
}
// Check for "UPGRADE" to websocket header
let has_hdr = if let Some(hdr) = res.headers.get(header::UPGRADE) {
if let Ok(s) = hdr.to_str() {
s.to_lowercase().contains("websocket")
} else {
false
}
} else {
false
};
if !has_hdr {
trace!("Invalid upgrade header");
return Err(ClientError::InvalidUpgradeHeader);
}
// Check for "CONNECTION" header
if let Some(conn) = res.headers.get(header::CONNECTION) {
if let Ok(s) = conn.to_str() {
if !s.to_lowercase().contains("upgrade") {
trace!("Invalid connection header: {}", s);
return Err(ClientError::InvalidConnectionHeader(conn.clone()));
}
} else {
trace!("Invalid connection header: {:?}", conn);
return Err(ClientError::InvalidConnectionHeader(conn.clone()));
}
} else {
trace!("Missing connection header");
return Err(ClientError::MissingConnectionHeader);
}
if let Some(key) = res.headers.get(header::SEC_WEBSOCKET_ACCEPT) {
// field is constructed by concatenating /key/
// with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (RFC 6455)
const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
let mut sha1 = Sha1::new();
sha1.update(self.key.as_ref());
sha1.update(WS_GUID);
let encoded = base64::encode(&sha1.digest().bytes());
if key.as_bytes() != encoded.as_bytes() {
trace!(
"Invalid challenge response: expected: {} received: {:?}",
encoded,
key
);
return Err(ClientError::InvalidChallengeResponse(encoded, key.clone()));
}
} else {
trace!("Missing SEC-WEBSOCKET-ACCEPT header");
return Err(ClientError::MissingWebSocketAcceptHeader);
};
// websockets codec
let codec = if self.server_mode {
Codec::new().max_size(self.max_size)
} else {
Codec::new().max_size(self.max_size).client_mode()
};
Ok(Async::Ready(framed.into_framed(codec)))
}
}

View File

@ -13,7 +13,6 @@ use crate::httpmessage::HttpMessage;
use crate::request::Request;
use crate::response::{Response, ResponseBuilder};
mod client;
mod codec;
mod frame;
mod mask;
@ -21,7 +20,6 @@ mod proto;
mod service;
mod transport;
pub use self::client::{Client, ClientError, Connect};
pub use self::codec::{Codec, Frame, Message};
pub use self::frame::Parser;
pub use self::proto::{hash_key, CloseCode, CloseReason, OpCode};