1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

initial work on client connector

This commit is contained in:
Nikolay Kim 2018-01-29 23:01:20 -08:00
parent 5cc3bba5cc
commit a02e0dfab6
8 changed files with 238 additions and 185 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-web" name = "actix-web"
version = "0.3.4" version = "0.4.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web framework" description = "Actix web framework"
readme = "README.md" readme = "README.md"

View File

@ -12,7 +12,6 @@ use std::time::Duration;
use actix::*; use actix::*;
use futures::Future; use futures::Future;
use tokio_core::net::TcpStream;
use actix_web::ws::{Message, WsClientError, WsClient, WsClientWriter}; use actix_web::ws::{Message, WsClientError, WsClient, WsClientWriter};
@ -54,7 +53,7 @@ fn main() {
} }
struct ChatClient(WsClientWriter<TcpStream>); struct ChatClient(WsClientWriter);
#[derive(Message)] #[derive(Message)]
struct ClientCommand(String); struct ClientCommand(String);

179
src/client/connect.rs Normal file
View File

@ -0,0 +1,179 @@
#![allow(unused_imports, dead_code)]
use std::{io, time};
use std::net::{SocketAddr, Shutdown};
use std::collections::VecDeque;
use std::time::Duration;
use actix::{fut, Actor, ActorFuture, Arbiter, ArbiterService, Context,
Handler, Response, ResponseType, Supervised};
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
use http::Uri;
use futures::{Async, Future, Poll};
use tokio_core::reactor::Timeout;
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_io::{AsyncRead, AsyncWrite};
use server::IoStream;
#[derive(Debug)]
pub struct Connect(pub Uri);
impl ResponseType for Connect {
type Item = Connection;
type Error = ClientConnectorError;
}
#[derive(Fail, Debug)]
pub enum ClientConnectorError {
/// Invalid url
#[fail(display="Invalid url")]
InvalidUrl,
/// SSL feature is not enabled
#[fail(display="SSL is not supported")]
SslIsNotSupported,
/// Connection error
#[fail(display = "{}", _0)]
Connector(ConnectorError),
/// Connecting took too long
#[fail(display = "Timeout out while establishing connection")]
Timeout,
/// Connector has been disconnected
#[fail(display = "Internal error: connector has been disconnected")]
Disconnected,
/// Connection io error
#[fail(display = "{}", _0)]
IoError(io::Error),
}
impl From<ConnectorError> for ClientConnectorError {
fn from(err: ConnectorError) -> ClientConnectorError {
ClientConnectorError::Connector(err)
}
}
#[derive(Debug, Default)]
pub struct ClientConnector {
}
impl Actor for ClientConnector {
type Context = Context<ClientConnector>;
}
impl Supervised for ClientConnector {}
impl ArbiterService for ClientConnector {}
impl Handler<Connect> for ClientConnector {
type Result = Response<ClientConnector, Connect>;
fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result {
let uri = &msg.0;
if uri.host().is_none() {
return Self::reply(Err(ClientConnectorError::InvalidUrl))
}
let proto = match uri.scheme_part() {
Some(scheme) => match Protocol::from(scheme.as_str()) {
Some(proto) => proto,
None => return Self::reply(Err(ClientConnectorError::InvalidUrl)),
},
None => return Self::reply(Err(ClientConnectorError::InvalidUrl)),
};
let port = uri.port().unwrap_or_else(|| proto.port());
Self::async_reply(
Connector::from_registry()
.call(self, ResolveConnect::host_and_port(uri.host().unwrap(), port))
.map_err(|_, _, _| ClientConnectorError::Disconnected)
.and_then(|res, _, _| match res {
Ok(stream) => fut::ok(Connection{stream: Box::new(stream)}),
Err(err) => fut::err(err.into())
}))
}
}
#[derive(PartialEq, Hash, Debug)]
enum Protocol {
Http,
Https,
Ws,
Wss,
}
impl Protocol {
fn from(s: &str) -> Option<Protocol> {
match s {
"http" => Some(Protocol::Http),
"https" => Some(Protocol::Https),
"ws" => Some(Protocol::Ws),
"wss" => Some(Protocol::Wss),
_ => None,
}
}
fn port(&self) -> u16 {
match *self {
Protocol::Http | Protocol::Ws => 80,
Protocol::Https | Protocol::Wss => 443
}
}
}
pub struct Connection {
stream: Box<IoStream>,
}
impl Connection {
pub fn stream(&mut self) -> &mut IoStream {
&mut *self.stream
}
}
impl IoStream for Connection {
fn shutdown(&mut self, how: Shutdown) -> io::Result<()> {
IoStream::shutdown(&mut *self.stream, how)
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
IoStream::set_nodelay(&mut *self.stream, nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
IoStream::set_linger(&mut *self.stream, dur)
}
}
impl io::Read for Connection {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.stream.read(buf)
}
}
impl AsyncRead for Connection {}
impl io::Write for Connection {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.stream.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.stream.flush()
}
}
impl AsyncWrite for Connection {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.stream.shutdown()
}
}

View File

@ -1,3 +1,4 @@
pub(crate) mod connect;
mod parser; mod parser;
mod request; mod request;
mod response; mod response;

View File

@ -147,6 +147,17 @@ pub use native_tls::Pkcs12;
#[cfg(feature="openssl")] #[cfg(feature="openssl")]
pub use openssl::pkcs12::Pkcs12; pub use openssl::pkcs12::Pkcs12;
#[cfg(feature="openssl")]
pub(crate) const HAS_OPENSSL: bool = true;
#[cfg(not(feature="openssl"))]
pub(crate) const HAS_OPENSSL: bool = false;
#[cfg(feature="tls")]
pub(crate) const HAS_TLS: bool = true;
#[cfg(not(feature="tls"))]
pub(crate) const HAS_TLS: bool = false;
pub mod headers { pub mod headers {
//! Headers implementation //! Headers implementation

View File

@ -1,4 +1,5 @@
//! Http client request //! Http client request
#![allow(unused_imports, dead_code)]
use std::{io, str}; use std::{io, str};
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
@ -12,9 +13,11 @@ use http::{HttpTryFrom, StatusCode, Error as HttpError};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
use sha1::Sha1; use sha1::Sha1;
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
// use futures::unsync::oneshot; use futures::future::{Either, err as FutErr};
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use actix::prelude::*;
use body::Binary; use body::Binary;
use error::UrlParseError; use error::UrlParseError;
use server::shared::SharedBytes; use server::shared::SharedBytes;
@ -22,14 +25,14 @@ use server::shared::SharedBytes;
use server::{utils, IoStream}; use server::{utils, IoStream};
use client::{ClientRequest, ClientRequestBuilder, use client::{ClientRequest, ClientRequestBuilder,
HttpResponseParser, HttpResponseParserError, HttpClientWriter}; HttpResponseParser, HttpResponseParserError, HttpClientWriter};
use client::connect::{Connect, Connection, ClientConnector, ClientConnectorError};
use super::Message; use super::Message;
use super::proto::{CloseCode, OpCode}; use super::proto::{CloseCode, OpCode};
use super::frame::Frame; use super::frame::Frame;
use super::connect::{TcpConnector, TcpConnectorError};
pub type WsClientFuture<T> = pub type WsClientFuture =
Future<Item=(WsClientReader<T>, WsClientWriter<T>), Error=WsClientError>; Future<Item=(WsClientReader, WsClientWriter), Error=WsClientError>;
/// Websockt client error /// Websockt client error
@ -52,7 +55,7 @@ pub enum WsClientError {
#[fail(display="Response parsing error")] #[fail(display="Response parsing error")]
ResponseParseError(HttpResponseParserError), ResponseParseError(HttpResponseParserError),
#[fail(display="{}", _0)] #[fail(display="{}", _0)]
Connection(TcpConnectorError), Connector(ClientConnectorError),
#[fail(display="{}", _0)] #[fail(display="{}", _0)]
Io(io::Error), Io(io::Error),
#[fail(display="Disconnected")] #[fail(display="Disconnected")]
@ -71,9 +74,9 @@ impl From<UrlParseError> for WsClientError {
} }
} }
impl From<TcpConnectorError> for WsClientError { impl From<ClientConnectorError> for WsClientError {
fn from(err: TcpConnectorError) -> WsClientError { fn from(err: ClientConnectorError) -> WsClientError {
WsClientError::Connection(err) WsClientError::Connector(err)
} }
} }
@ -145,7 +148,7 @@ impl WsClient {
self self
} }
pub fn connect(&mut self) -> Result<Box<WsClientFuture<TcpStream>>, WsClientError> { pub fn connect(&mut self) -> Result<Box<WsClientFuture>, WsClientError> {
if let Some(e) = self.err.take() { if let Some(e) = self.err.take() {
return Err(e) return Err(e)
} }
@ -178,19 +181,20 @@ impl WsClient {
return Err(WsClientError::InvalidUrl); return Err(WsClientError::InvalidUrl);
} }
let connect = TcpConnector::new( // get connection and start handshake
request.uri().host().unwrap(),
request.uri().port().unwrap_or(80), Duration::from_secs(5));
Ok(Box::new( Ok(Box::new(
connect ClientConnector::from_registry().call_fut(Connect(request.uri().clone()))
.from_err() .map_err(|_| WsClientError::Disconnected)
.and_then(move |stream| WsHandshake::new(stream, request)))) .and_then(|res| match res {
Ok(stream) => Either::A(WsHandshake::new(stream, request)),
Err(err) => Either::B(FutErr(err.into())),
})
))
} }
} }
struct WsInner<T> { struct WsInner {
stream: T, conn: Connection,
writer: HttpClientWriter, writer: HttpClientWriter,
parser: HttpResponseParser, parser: HttpResponseParser,
parser_buf: BytesMut, parser_buf: BytesMut,
@ -198,15 +202,15 @@ struct WsInner<T> {
error_sent: bool, error_sent: bool,
} }
struct WsHandshake<T> { struct WsHandshake {
inner: Option<WsInner<T>>, inner: Option<WsInner>,
request: ClientRequest, request: ClientRequest,
sent: bool, sent: bool,
key: String, key: String,
} }
impl<T: IoStream> WsHandshake<T> { impl WsHandshake {
fn new(stream: T, mut request: ClientRequest) -> WsHandshake<T> { fn new(conn: Connection, mut request: ClientRequest) -> WsHandshake {
// Generate a random key for the `Sec-WebSocket-Key` header. // Generate a random key for the `Sec-WebSocket-Key` header.
// a base64-encoded (see Section 4 of [RFC4648]) value that, // a base64-encoded (see Section 4 of [RFC4648]) value that,
// when decoded, is 16 bytes in length (RFC 6455) // when decoded, is 16 bytes in length (RFC 6455)
@ -218,7 +222,7 @@ impl<T: IoStream> WsHandshake<T> {
HeaderValue::try_from(key.as_str()).unwrap()); HeaderValue::try_from(key.as_str()).unwrap());
let inner = WsInner { let inner = WsInner {
stream: stream, conn: conn,
writer: HttpClientWriter::new(SharedBytes::default()), writer: HttpClientWriter::new(SharedBytes::default()),
parser: HttpResponseParser::default(), parser: HttpResponseParser::default(),
parser_buf: BytesMut::new(), parser_buf: BytesMut::new(),
@ -235,8 +239,8 @@ impl<T: IoStream> WsHandshake<T> {
} }
} }
impl<T: IoStream> Future for WsHandshake<T> { impl Future for WsHandshake {
type Item = (WsClientReader<T>, WsClientWriter<T>); type Item = (WsClientReader, WsClientWriter);
type Error = WsClientError; type Error = WsClientError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -246,11 +250,11 @@ impl<T: IoStream> Future for WsHandshake<T> {
self.sent = true; self.sent = true;
inner.writer.start(&mut self.request); inner.writer.start(&mut self.request);
} }
if let Err(err) = inner.writer.poll_completed(&mut inner.stream, false) { if let Err(err) = inner.writer.poll_completed(&mut inner.conn, false) {
return Err(err.into()) return Err(err.into())
} }
match inner.parser.parse(&mut inner.stream, &mut inner.parser_buf) { match inner.parser.parse(&mut inner.conn, &mut inner.parser_buf) {
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
// verify response // verify response
if resp.status() != StatusCode::SWITCHING_PROTOCOLS { if resp.status() != StatusCode::SWITCHING_PROTOCOLS {
@ -311,22 +315,22 @@ impl<T: IoStream> Future for WsHandshake<T> {
} }
struct Inner<T> { struct Inner {
inner: WsInner<T>, inner: WsInner,
} }
pub struct WsClientReader<T> { pub struct WsClientReader {
inner: Rc<UnsafeCell<Inner<T>>> inner: Rc<UnsafeCell<Inner>>
} }
impl<T> WsClientReader<T> { impl WsClientReader {
#[inline] #[inline]
fn as_mut(&mut self) -> &mut Inner<T> { fn as_mut(&mut self) -> &mut Inner {
unsafe{ &mut *self.inner.get() } unsafe{ &mut *self.inner.get() }
} }
} }
impl<T: IoStream> Stream for WsClientReader<T> { impl Stream for WsClientReader {
type Item = Message; type Item = Message;
type Error = WsClientError; type Error = WsClientError;
@ -334,7 +338,7 @@ impl<T: IoStream> Stream for WsClientReader<T> {
let inner = self.as_mut(); let inner = self.as_mut();
let mut done = false; let mut done = false;
match utils::read_from_io(&mut inner.inner.stream, &mut inner.inner.parser_buf) { match utils::read_from_io(&mut inner.inner.conn, &mut inner.inner.parser_buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => {
done = true; done = true;
inner.inner.closed = true; inner.inner.closed = true;
@ -345,7 +349,7 @@ impl<T: IoStream> Stream for WsClientReader<T> {
} }
// write // write
let _ = inner.inner.writer.poll_completed(&mut inner.inner.stream, false); let _ = inner.inner.writer.poll_completed(&mut inner.inner.conn, false);
// read // read
match Frame::parse(&mut inner.inner.parser_buf) { match Frame::parse(&mut inner.inner.parser_buf) {
@ -406,18 +410,18 @@ impl<T: IoStream> Stream for WsClientReader<T> {
} }
} }
pub struct WsClientWriter<T> { pub struct WsClientWriter {
inner: Rc<UnsafeCell<Inner<T>>> inner: Rc<UnsafeCell<Inner>>
} }
impl<T: IoStream> WsClientWriter<T> { impl WsClientWriter {
#[inline] #[inline]
fn as_mut(&mut self) -> &mut Inner<T> { fn as_mut(&mut self) -> &mut Inner {
unsafe{ &mut *self.inner.get() } unsafe{ &mut *self.inner.get() }
} }
} }
impl<T: IoStream> WsClientWriter<T> { impl WsClientWriter {
/// Write payload /// Write payload
#[inline] #[inline]

View File

@ -1,139 +0,0 @@
use std::io;
use std::net::SocketAddr;
use std::collections::VecDeque;
use std::time::Duration;
use actix::Arbiter;
use trust_dns_resolver::ResolverFuture;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::lookup_ip::LookupIpFuture;
use futures::{Async, Future, Poll};
use tokio_core::reactor::Timeout;
use tokio_core::net::{TcpStream, TcpStreamNew};
#[derive(Fail, Debug)]
pub enum TcpConnectorError {
/// Failed to resolve the hostname
#[fail(display = "Failed resolving hostname: {}", _0)]
Dns(String),
/// Address is invalid
#[fail(display = "Invalid input: {}", _0)]
InvalidInput(&'static str),
/// Connecting took too long
#[fail(display = "Timeout out while establishing connection")]
Timeout,
/// Connection io error
#[fail(display = "{}", _0)]
IoError(io::Error),
}
pub struct TcpConnector {
lookup: Option<LookupIpFuture>,
port: u16,
ips: VecDeque<SocketAddr>,
error: Option<TcpConnectorError>,
timeout: Timeout,
stream: Option<TcpStreamNew>,
}
impl TcpConnector {
pub fn new<S: AsRef<str>>(addr: S, port: u16, timeout: Duration) -> TcpConnector {
// try to parse as a regular SocketAddr first
if let Ok(addr) = addr.as_ref().parse() {
let mut ips = VecDeque::new();
ips.push_back(addr);
TcpConnector {
lookup: None,
port: port,
ips: ips,
error: None,
stream: None,
timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() }
} else {
// we need to do dns resolution
let resolve = match ResolverFuture::from_system_conf(Arbiter::handle()) {
Ok(resolve) => resolve,
Err(err) => {
warn!("Can not create system dns resolver: {}", err);
ResolverFuture::new(
ResolverConfig::default(),
ResolverOpts::default(),
Arbiter::handle())
}
};
TcpConnector {
lookup: Some(resolve.lookup_ip(addr.as_ref())),
port: port,
ips: VecDeque::new(),
error: None,
stream: None,
timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() }
}
}
}
impl Future for TcpConnector {
type Item = TcpStream;
type Error = TcpConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(err) = self.error.take() {
Err(err)
} else {
// timeout
if let Ok(Async::Ready(_)) = self.timeout.poll() {
return Err(TcpConnectorError::Timeout)
}
// lookip ips
if let Some(mut lookup) = self.lookup.take() {
match lookup.poll() {
Ok(Async::NotReady) => {
self.lookup = Some(lookup);
return Ok(Async::NotReady)
},
Ok(Async::Ready(ips)) => {
let port = self.port;
let ips = ips.iter().map(|ip| SocketAddr::new(ip, port));
self.ips.extend(ips);
if self.ips.is_empty() {
return Err(TcpConnectorError::Dns(
"Expect at least one A dns record".to_owned()))
}
},
Err(err) => return Err(TcpConnectorError::Dns(format!("{}", err))),
}
}
// connect
loop {
if let Some(mut new) = self.stream.take() {
match new.poll() {
Ok(Async::Ready(sock)) =>
return Ok(Async::Ready(sock)),
Ok(Async::NotReady) => {
self.stream = Some(new);
return Ok(Async::NotReady)
},
Err(err) => {
if self.ips.is_empty() {
return Err(TcpConnectorError::IoError(err))
}
}
}
}
// try to connect
let addr = self.ips.pop_front().unwrap();
self.stream = Some(TcpStream::connect(&addr, Arbiter::handle()));
}
}
}
}

View File

@ -61,8 +61,6 @@ mod context;
mod mask; mod mask;
mod client; mod client;
mod connect;
use self::frame::Frame; use self::frame::Frame;
use self::proto::{hash_key, OpCode}; use self::proto::{hash_key, OpCode};
pub use self::proto::CloseCode; pub use self::proto::CloseCode;