//! Http client request #![allow(unused_imports, dead_code)] use std::{fmt, io, str}; use std::rc::Rc; use std::time::Duration; use std::cell::UnsafeCell; use base64; use rand; use cookie::Cookie; use bytes::BytesMut; use http::{HttpTryFrom, StatusCode, Error as HttpError}; use http::header::{self, HeaderName, HeaderValue}; use sha1::Sha1; use futures::{Async, Future, Poll, Stream}; use futures::future::{Either, err as FutErr}; use tokio_core::net::TcpStream; use actix::prelude::*; use body::Binary; use error::UrlParseError; use server::shared::SharedBytes; use server::{utils, IoStream}; use client::{ClientRequest, ClientRequestBuilder, HttpResponseParser, HttpResponseParserError, HttpClientWriter}; use client::{Connect, Connection, ClientConnector, ClientConnectorError}; use super::Message; use super::frame::Frame; use super::proto::{CloseCode, OpCode}; /// Websocket client error #[derive(Fail, Debug)] pub enum WsClientError { #[fail(display="Invalid url")] InvalidUrl, #[fail(display="Invalid response status")] InvalidResponseStatus, #[fail(display="Invalid upgrade header")] InvalidUpgradeHeader, #[fail(display="Invalid connection header")] InvalidConnectionHeader, #[fail(display="Invalid challenge response")] InvalidChallengeResponse, #[fail(display="Http parsing error")] Http(HttpError), #[fail(display="Url parsing error")] Url(UrlParseError), #[fail(display="Response parsing error")] ResponseParseError(HttpResponseParserError), #[fail(display="{}", _0)] Connector(ClientConnectorError), #[fail(display="{}", _0)] Io(io::Error), #[fail(display="Disconnected")] Disconnected, } impl From for WsClientError { fn from(err: HttpError) -> WsClientError { WsClientError::Http(err) } } impl From for WsClientError { fn from(err: UrlParseError) -> WsClientError { WsClientError::Url(err) } } impl From for WsClientError { fn from(err: ClientConnectorError) -> WsClientError { WsClientError::Connector(err) } } impl From for WsClientError { fn from(err: io::Error) -> WsClientError { WsClientError::Io(err) } } impl From for WsClientError { fn from(err: HttpResponseParserError) -> WsClientError { WsClientError::ResponseParseError(err) } } /// `WebSocket` client /// /// Example of `WebSocket` client usage is available in /// [websocket example]( /// https://github.com/actix/actix-web/blob/master/examples/websocket/src/client.rs#L24) pub struct WsClient { request: ClientRequestBuilder, err: Option, http_err: Option, origin: Option, protocols: Option, conn: Addr, } impl WsClient { /// Create new websocket connection pub fn new>(uri: S) -> WsClient { WsClient::with_connector(uri, ClientConnector::from_registry()) } /// Create new websocket connection with custom `ClientConnector` pub fn with_connector>(uri: S, conn: Addr) -> WsClient { let mut cl = WsClient { request: ClientRequest::build(), err: None, http_err: None, origin: None, protocols: None, conn: conn, }; cl.request.uri(uri.as_ref()); cl } /// Set supported websocket protocols pub fn protocols(mut self, protos: U) -> Self where U: IntoIterator + 'static, V: AsRef { let mut protos = protos.into_iter() .fold(String::new(), |acc, s| {acc + s.as_ref() + ","}); protos.pop(); self.protocols = Some(protos); self } /// Set cookie for handshake request pub fn cookie(mut self, cookie: Cookie) -> Self { self.request.cookie(cookie); self } /// Set request Origin pub fn origin(mut self, origin: V) -> Self where HeaderValue: HttpTryFrom { match HeaderValue::try_from(origin) { Ok(value) => self.origin = Some(value), Err(e) => self.http_err = Some(e.into()), } self } /// Set request header pub fn header(mut self, key: K, value: V) -> Self where HeaderName: HttpTryFrom, HeaderValue: HttpTryFrom { self.request.header(key, value); self } /// Connect to websocket server and do ws handshake pub fn connect(&mut self) -> WsHandshake { if let Some(e) = self.err.take() { WsHandshake::new(None, Some(e), &self.conn) } else if let Some(e) = self.http_err.take() { WsHandshake::new(None, Some(e.into()), &self.conn) } else { // origin if let Some(origin) = self.origin.take() { self.request.set_header(header::ORIGIN, origin); } self.request.upgrade(); self.request.set_header(header::UPGRADE, "websocket"); self.request.set_header(header::CONNECTION, "upgrade"); self.request.set_header("SEC-WEBSOCKET-VERSION", "13"); if let Some(protocols) = self.protocols.take() { self.request.set_header("SEC-WEBSOCKET-PROTOCOL", protocols.as_str()); } let request = match self.request.finish() { Ok(req) => req, Err(err) => return WsHandshake::new(None, Some(err.into()), &self.conn), }; if request.uri().host().is_none() { return WsHandshake::new(None, Some(WsClientError::InvalidUrl), &self.conn) } if let Some(scheme) = request.uri().scheme_part() { if scheme != "http" && scheme != "https" && scheme != "ws" && scheme != "wss" { return WsHandshake::new( None, Some(WsClientError::InvalidUrl), &self.conn) } } else { return WsHandshake::new(None, Some(WsClientError::InvalidUrl), &self.conn) } // start handshake WsHandshake::new(Some(request), None, &self.conn) } } } struct WsInner { conn: Connection, writer: HttpClientWriter, parser: HttpResponseParser, parser_buf: BytesMut, closed: bool, error_sent: bool, } pub struct WsHandshake { inner: Option, request: Option, sent: bool, key: String, error: Option, stream: Option, Error=WsClientError>>>, } impl WsHandshake { fn new(request: Option, err: Option, conn: &Addr) -> WsHandshake { // Generate a random key for the `Sec-WebSocket-Key` header. // a base64-encoded (see Section 4 of [RFC4648]) value that, // when decoded, is 16 bytes in length (RFC 6455) let sec_key: [u8; 16] = rand::random(); let key = base64::encode(&sec_key); if let Some(mut request) = request { let stream = Box::new( conn.send(Connect(request.uri().clone())) .map(|res| res.map_err(|e| e.into())) .map_err(|_| WsClientError::Disconnected)); request.headers_mut().insert( HeaderName::try_from("SEC-WEBSOCKET-KEY").unwrap(), HeaderValue::try_from(key.as_str()).unwrap()); WsHandshake { key: key, inner: None, request: Some(request), sent: false, error: err, stream: Some(stream), } } else { WsHandshake { key: key, inner: None, request: None, sent: false, error: err, stream: None, } } } } impl Future for WsHandshake { type Item = (WsClientReader, WsClientWriter); type Error = WsClientError; fn poll(&mut self) -> Poll { if let Some(err) = self.error.take() { return Err(err) } if self.stream.is_some() { match self.stream.as_mut().unwrap().poll()? { Async::Ready(result) => match result { Ok(conn) => { let inner = WsInner { conn: conn, writer: HttpClientWriter::new(SharedBytes::default()), parser: HttpResponseParser::default(), parser_buf: BytesMut::new(), closed: false, error_sent: false, }; self.stream.take(); self.inner = Some(inner); } Err(err) => return Err(err), }, Async::NotReady => return Ok(Async::NotReady) } } let mut inner = self.inner.take().unwrap(); if !self.sent { self.sent = true; inner.writer.start(self.request.as_mut().unwrap())?; } if let Err(err) = inner.writer.poll_completed(&mut inner.conn, false) { return Err(err.into()) } match inner.parser.parse(&mut inner.conn, &mut inner.parser_buf) { Ok(Async::Ready(resp)) => { // verify response if resp.status() != StatusCode::SWITCHING_PROTOCOLS { return Err(WsClientError::InvalidResponseStatus) } // Check for "UPGRADE" to websocket header let has_hdr = if let Some(hdr) = resp.headers().get(header::UPGRADE) { if let Ok(s) = hdr.to_str() { s.to_lowercase().contains("websocket") } else { false } } else { false }; if !has_hdr { return Err(WsClientError::InvalidUpgradeHeader) } // Check for "CONNECTION" header let has_hdr = if let Some(conn) = resp.headers().get(header::CONNECTION) { if let Ok(s) = conn.to_str() { s.to_lowercase().contains("upgrade") } else { false } } else { false }; if !has_hdr { return Err(WsClientError::InvalidConnectionHeader) } let match_key = if let Some(key) = resp.headers().get( HeaderName::try_from("SEC-WEBSOCKET-ACCEPT").unwrap()) { // field is constructed by concatenating /key/ // with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (RFC 6455) const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; let mut sha1 = Sha1::new(); sha1.update(self.key.as_ref()); sha1.update(WS_GUID); key.as_bytes() == base64::encode(&sha1.digest().bytes()).as_bytes() } else { false }; if !match_key { return Err(WsClientError::InvalidChallengeResponse) } let inner = Rc::new(UnsafeCell::new(inner)); Ok(Async::Ready( (WsClientReader{inner: Rc::clone(&inner)}, WsClientWriter{inner: inner}))) }, Ok(Async::NotReady) => { self.inner = Some(inner); Ok(Async::NotReady) }, Err(err) => Err(err.into()) } } } pub struct WsClientReader { inner: Rc> } impl fmt::Debug for WsClientReader { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "WsClientReader()") } } impl WsClientReader { #[inline] fn as_mut(&mut self) -> &mut WsInner { unsafe{ &mut *self.inner.get() } } } impl Stream for WsClientReader { type Item = Message; type Error = WsClientError; fn poll(&mut self) -> Poll, Self::Error> { let inner = self.as_mut(); let mut done = false; match utils::read_from_io(&mut inner.conn, &mut inner.parser_buf) { Ok(Async::Ready(0)) => { done = true; inner.closed = true; }, Ok(Async::Ready(_)) | Ok(Async::NotReady) => (), Err(err) => return Err(err.into()) } // write let _ = inner.writer.poll_completed(&mut inner.conn, false); // read match Frame::parse(&mut inner.parser_buf, false) { Ok(Some(frame)) => { // trace!("WsFrame {}", frame); let (_finished, opcode, payload) = frame.unpack(); match opcode { OpCode::Continue => unimplemented!(), OpCode::Bad => Ok(Async::Ready(Some(Message::Error))), OpCode::Close => { inner.closed = true; inner.error_sent = true; Ok(Async::Ready(Some(Message::Closed))) }, OpCode::Ping => Ok(Async::Ready(Some( Message::Ping( String::from_utf8_lossy(payload.as_ref()).into())))), OpCode::Pong => Ok(Async::Ready(Some( Message::Pong( String::from_utf8_lossy(payload.as_ref()).into())))), OpCode::Binary => Ok(Async::Ready(Some(Message::Binary(payload)))), OpCode::Text => { let tmp = Vec::from(payload.as_ref()); match String::from_utf8(tmp) { Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))), Err(_) => Ok(Async::Ready(Some(Message::Error))), } } } } Ok(None) => { if done { Ok(Async::Ready(None)) } else if inner.closed { if !inner.error_sent { inner.error_sent = true; Ok(Async::Ready(Some(Message::Closed))) } else { Ok(Async::Ready(None)) } } else { Ok(Async::NotReady) } }, Err(err) => { inner.closed = true; inner.error_sent = true; Err(err.into()) } } } } pub struct WsClientWriter { inner: Rc> } impl WsClientWriter { #[inline] fn as_mut(&mut self) -> &mut WsInner { unsafe{ &mut *self.inner.get() } } } impl WsClientWriter { /// Write payload #[inline] fn write(&mut self, data: Binary) { if !self.as_mut().closed { let _ = self.as_mut().writer.write(data); } else { warn!("Trying to write to disconnected response"); } } /// Send text frame #[inline] pub fn text>(&mut self, text: T) { self.write(Frame::message(text.into(), OpCode::Text, true, true)); } /// Send binary frame #[inline] pub fn binary>(&mut self, data: B) { self.write(Frame::message(data, OpCode::Binary, true, true)); } /// Send ping frame #[inline] pub fn ping(&mut self, message: &str) { self.write(Frame::message(Vec::from(message), OpCode::Ping, true, true)); } /// Send pong frame #[inline] pub fn pong(&mut self, message: &str) { self.write(Frame::message(Vec::from(message), OpCode::Pong, true, true)); } /// Send close frame #[inline] pub fn close(&mut self, code: CloseCode, reason: &str) { self.write(Frame::close(code, reason, true)); } }