mirror of
https://github.com/actix/actix-extras.git
synced 2024-12-01 02:44:37 +01:00
fmt
This commit is contained in:
parent
0b42cae082
commit
a2b170fec9
@ -19,36 +19,28 @@ use tokio_timer::Delay;
|
|||||||
#[cfg(feature = "alpn")]
|
#[cfg(feature = "alpn")]
|
||||||
use {
|
use {
|
||||||
openssl::ssl::{Error as SslError, SslConnector, SslMethod},
|
openssl::ssl::{Error as SslError, SslConnector, SslMethod},
|
||||||
tokio_openssl::SslConnectorExt
|
tokio_openssl::SslConnectorExt,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||||
use {
|
use {
|
||||||
native_tls::{Error as SslError, TlsConnector as NativeTlsConnector},
|
native_tls::{Error as SslError, TlsConnector as NativeTlsConnector},
|
||||||
tokio_tls::TlsConnector as SslConnector
|
tokio_tls::TlsConnector as SslConnector,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
|
||||||
use {
|
use {
|
||||||
rustls::ClientConfig,
|
rustls::ClientConfig, std::io::Error as SslError, std::sync::Arc,
|
||||||
std::io::Error as SslError,
|
tokio_rustls::ClientConfigExt, webpki::DNSNameRef, webpki_roots,
|
||||||
std::sync::Arc,
|
|
||||||
tokio_rustls::ClientConfigExt,
|
|
||||||
webpki::DNSNameRef,
|
|
||||||
webpki_roots,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
|
||||||
type SslConnector = Arc<ClientConfig>;
|
type SslConnector = Arc<ClientConfig>;
|
||||||
|
|
||||||
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||||
@ -255,17 +247,19 @@ impl Default for ClientConnector {
|
|||||||
fn default() -> ClientConnector {
|
fn default() -> ClientConnector {
|
||||||
let connector = {
|
let connector = {
|
||||||
#[cfg(all(feature = "alpn"))]
|
#[cfg(all(feature = "alpn"))]
|
||||||
{ SslConnector::builder(SslMethod::tls()).unwrap().build() }
|
{
|
||||||
|
SslConnector::builder(SslMethod::tls()).unwrap().build()
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
#[cfg(all(feature = "tls", not(feature = "alpn")))]
|
||||||
{ NativeTlsConnector::builder().build().unwrap().into() }
|
{
|
||||||
|
NativeTlsConnector::builder().build().unwrap().into()
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
|
||||||
{
|
{
|
||||||
let mut config = ClientConfig::new();
|
let mut config = ClientConfig::new();
|
||||||
config
|
config
|
||||||
@ -275,7 +269,9 @@ impl Default for ClientConnector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
#[cfg(not(any(feature = "alpn", feature = "tls", feature = "rust-tls")))]
|
||||||
{ () }
|
{
|
||||||
|
()
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ClientConnector::with_connector_impl(connector)
|
ClientConnector::with_connector_impl(connector)
|
||||||
@ -327,12 +323,10 @@ impl ClientConnector {
|
|||||||
Self::with_connector_impl(connector)
|
Self::with_connector_impl(connector)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
|
||||||
/// Create `ClientConnector` actor with custom `SslConnector` instance.
|
/// Create `ClientConnector` actor with custom `SslConnector` instance.
|
||||||
///
|
///
|
||||||
/// By default `ClientConnector` uses very a simple SSL configuration.
|
/// By default `ClientConnector` uses very a simple SSL configuration.
|
||||||
@ -382,12 +376,10 @@ impl ClientConnector {
|
|||||||
Self::with_connector_impl(Arc::new(connector))
|
Self::with_connector_impl(Arc::new(connector))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "tls",
|
feature = "tls",
|
||||||
not(any(feature = "alpn", feature = "rust-tls"))
|
not(any(feature = "alpn", feature = "rust-tls"))
|
||||||
)
|
))]
|
||||||
)]
|
|
||||||
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
pub fn with_connector(connector: SslConnector) -> ClientConnector {
|
||||||
// keep level of indirection for docstrings matching featureflags
|
// keep level of indirection for docstrings matching featureflags
|
||||||
Self::with_connector_impl(connector)
|
Self::with_connector_impl(connector)
|
||||||
@ -772,12 +764,10 @@ impl ClientConnector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(
|
#[cfg(all(
|
||||||
all(
|
|
||||||
feature = "rust-tls",
|
feature = "rust-tls",
|
||||||
not(any(feature = "alpn", feature = "tls"))
|
not(any(feature = "alpn", feature = "tls"))
|
||||||
)
|
))]
|
||||||
)]
|
|
||||||
match res {
|
match res {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let _ = waiter.tx.send(Err(err.into()));
|
let _ = waiter.tx.send(Err(err.into()));
|
||||||
@ -1263,7 +1253,7 @@ impl AsyncWrite for Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
use tokio_tls::{TlsStream};
|
use tokio_tls::TlsStream;
|
||||||
|
|
||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
/// This is temp solution untile actix-net migration
|
/// This is temp solution untile actix-net migration
|
||||||
|
@ -50,7 +50,9 @@ impl HttpResponseParser {
|
|||||||
}
|
}
|
||||||
Async::NotReady => {
|
Async::NotReady => {
|
||||||
if buf.capacity() >= MAX_BUFFER_SIZE {
|
if buf.capacity() >= MAX_BUFFER_SIZE {
|
||||||
return Err(HttpResponseParserError::Error(ParseError::TooLarge));
|
return Err(HttpResponseParserError::Error(
|
||||||
|
ParseError::TooLarge,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
// Parser needs more data.
|
// Parser needs more data.
|
||||||
}
|
}
|
||||||
@ -63,9 +65,7 @@ impl HttpResponseParser {
|
|||||||
}
|
}
|
||||||
Ok(Async::Ready(_)) => (),
|
Ok(Async::Ready(_)) => (),
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Err(err) => {
|
Err(err) => return Err(HttpResponseParserError::Error(err.into())),
|
||||||
return Err(HttpResponseParserError::Error(err.into()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -236,7 +236,6 @@ macro_rules! FROM_STR {
|
|||||||
($type:ty) => {
|
($type:ty) => {
|
||||||
impl FromParam for $type {
|
impl FromParam for $type {
|
||||||
type Err = InternalError<<$type as FromStr>::Err>;
|
type Err = InternalError<<$type as FromStr>::Err>;
|
||||||
|
|
||||||
fn from_param(val: &str) -> Result<Self, Self::Err> {
|
fn from_param(val: &str) -> Result<Self, Self::Err> {
|
||||||
<$type as FromStr>::from_str(val)
|
<$type as FromStr>::from_str(val)
|
||||||
.map_err(|e| InternalError::new(e, StatusCode::BAD_REQUEST))
|
.map_err(|e| InternalError::new(e, StatusCode::BAD_REQUEST))
|
||||||
|
@ -160,8 +160,9 @@ where
|
|||||||
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
|
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
|
||||||
match kind {
|
match kind {
|
||||||
ProtocolKind::Http1 => {
|
ProtocolKind::Http1 => {
|
||||||
self.proto =
|
self.proto = Some(HttpProtocol::H1(h1::Http1::new(
|
||||||
Some(HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf, is_eof)));
|
settings, io, addr, buf, is_eof,
|
||||||
|
)));
|
||||||
return self.poll();
|
return self.poll();
|
||||||
}
|
}
|
||||||
ProtocolKind::Http2 => {
|
ProtocolKind::Http2 => {
|
||||||
|
@ -94,7 +94,11 @@ where
|
|||||||
buf: BytesMut, is_eof: bool,
|
buf: BytesMut, is_eof: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Http1 {
|
Http1 {
|
||||||
flags: if is_eof { Flags::READ_DISCONNECTED } else { Flags::KEEPALIVE },
|
flags: if is_eof {
|
||||||
|
Flags::READ_DISCONNECTED
|
||||||
|
} else {
|
||||||
|
Flags::KEEPALIVE
|
||||||
|
},
|
||||||
stream: H1Writer::new(stream, Rc::clone(&settings)),
|
stream: H1Writer::new(stream, Rc::clone(&settings)),
|
||||||
decoder: H1Decoder::new(),
|
decoder: H1Decoder::new(),
|
||||||
payload: None,
|
payload: None,
|
||||||
@ -118,8 +122,11 @@ where
|
|||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn can_read(&self) -> bool {
|
fn can_read(&self) -> bool {
|
||||||
if self.flags.intersects(Flags::ERROR | Flags::READ_DISCONNECTED) {
|
if self
|
||||||
return false
|
.flags
|
||||||
|
.intersects(Flags::ERROR | Flags::READ_DISCONNECTED)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ref info) = self.payload {
|
if let Some(ref info) = self.payload {
|
||||||
@ -171,8 +178,9 @@ where
|
|||||||
// shutdown
|
// shutdown
|
||||||
if self.flags.contains(Flags::SHUTDOWN) {
|
if self.flags.contains(Flags::SHUTDOWN) {
|
||||||
if self.flags.intersects(
|
if self.flags.intersects(
|
||||||
Flags::ERROR | Flags::READ_DISCONNECTED | Flags::WRITE_DISCONNECTED) {
|
Flags::ERROR | Flags::READ_DISCONNECTED | Flags::WRITE_DISCONNECTED,
|
||||||
return Ok(Async::Ready(()))
|
) {
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
}
|
}
|
||||||
match self.stream.poll_completed(true) {
|
match self.stream.poll_completed(true) {
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
@ -240,7 +248,8 @@ where
|
|||||||
let mut idx = 0;
|
let mut idx = 0;
|
||||||
while idx < self.tasks.len() {
|
while idx < self.tasks.len() {
|
||||||
// only one task can do io operation in http/1
|
// only one task can do io operation in http/1
|
||||||
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF)
|
if !io
|
||||||
|
&& !self.tasks[idx].flags.contains(EntryFlags::EOF)
|
||||||
&& !self.flags.contains(Flags::WRITE_DISCONNECTED)
|
&& !self.flags.contains(Flags::WRITE_DISCONNECTED)
|
||||||
{
|
{
|
||||||
// io is corrupted, send buffer
|
// io is corrupted, send buffer
|
||||||
@ -291,8 +300,9 @@ where
|
|||||||
} else if !self.tasks[idx].flags.contains(EntryFlags::FINISHED) {
|
} else if !self.tasks[idx].flags.contains(EntryFlags::FINISHED) {
|
||||||
match self.tasks[idx].pipe.poll_completed() {
|
match self.tasks[idx].pipe.poll_completed() {
|
||||||
Ok(Async::NotReady) => (),
|
Ok(Async::NotReady) => (),
|
||||||
Ok(Async::Ready(_)) =>
|
Ok(Async::Ready(_)) => {
|
||||||
self.tasks[idx].flags.insert(EntryFlags::FINISHED),
|
self.tasks[idx].flags.insert(EntryFlags::FINISHED)
|
||||||
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.notify_disconnect();
|
self.notify_disconnect();
|
||||||
self.tasks[idx].flags.insert(EntryFlags::ERROR);
|
self.tasks[idx].flags.insert(EntryFlags::ERROR);
|
||||||
@ -319,9 +329,7 @@ where
|
|||||||
// check stream state
|
// check stream state
|
||||||
if self.flags.contains(Flags::STARTED) {
|
if self.flags.contains(Flags::STARTED) {
|
||||||
match self.stream.poll_completed(false) {
|
match self.stream.poll_completed(false) {
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
return Ok(Async::NotReady)
|
|
||||||
},
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("Error sending data: {}", err);
|
debug!("Error sending data: {}", err);
|
||||||
self.notify_disconnect();
|
self.notify_disconnect();
|
||||||
@ -458,11 +466,13 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
if self.flags.contains(Flags::READ_DISCONNECTED) && self.tasks.is_empty() {
|
if self.flags.contains(Flags::READ_DISCONNECTED)
|
||||||
|
&& self.tasks.is_empty()
|
||||||
|
{
|
||||||
self.client_disconnect();
|
self.client_disconnect();
|
||||||
}
|
}
|
||||||
break
|
break;
|
||||||
},
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.flags.insert(Flags::ERROR);
|
self.flags.insert(Flags::ERROR);
|
||||||
if let Some(mut payload) = self.payload.take() {
|
if let Some(mut payload) = self.payload.take() {
|
||||||
@ -631,7 +641,8 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_req_parse2() {
|
fn test_req_parse2() {
|
||||||
let buf = Buffer::new("");
|
let buf = Buffer::new("");
|
||||||
let readbuf = BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..]));
|
let readbuf =
|
||||||
|
BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..]));
|
||||||
let settings = Rc::new(wrk_settings());
|
let settings = Rc::new(wrk_settings());
|
||||||
|
|
||||||
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true);
|
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true);
|
||||||
|
Loading…
Reference in New Issue
Block a user