mirror of
https://github.com/actix/actix-extras.git
synced 2025-07-01 12:15:08 +02:00
clippy fmt
This commit is contained in:
@ -166,9 +166,9 @@ impl H1Decoder {
|
||||
{
|
||||
true
|
||||
} else {
|
||||
version == Version::HTTP_11
|
||||
&& !(conn.contains("close")
|
||||
|| conn.contains("upgrade"))
|
||||
version == Version::HTTP_11 && !(conn
|
||||
.contains("close")
|
||||
|| conn.contains("upgrade"))
|
||||
}
|
||||
} else {
|
||||
false
|
||||
|
@ -152,8 +152,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
||||
let reason = msg.reason().as_bytes();
|
||||
if let Body::Binary(ref bytes) = body {
|
||||
buffer.reserve(
|
||||
256
|
||||
+ msg.headers().len() * AVERAGE_HEADER_SIZE
|
||||
256 + msg.headers().len() * AVERAGE_HEADER_SIZE
|
||||
+ bytes.len()
|
||||
+ reason.len(),
|
||||
);
|
||||
|
@ -115,46 +115,51 @@ where
|
||||
if disconnected {
|
||||
item.flags.insert(EntryFlags::EOF);
|
||||
} else {
|
||||
let retry = item.payload.need_read() == PayloadStatus::Read;
|
||||
loop {
|
||||
match item.task.poll_io(&mut item.stream) {
|
||||
Ok(Async::Ready(ready)) => {
|
||||
if ready {
|
||||
let retry = item.payload.need_read() == PayloadStatus::Read;
|
||||
loop {
|
||||
match item.task.poll_io(&mut item.stream) {
|
||||
Ok(Async::Ready(ready)) => {
|
||||
if ready {
|
||||
item.flags.insert(
|
||||
EntryFlags::EOF | EntryFlags::FINISHED,
|
||||
);
|
||||
} else {
|
||||
item.flags.insert(EntryFlags::EOF);
|
||||
}
|
||||
not_ready = false;
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
if item.payload.need_read()
|
||||
== PayloadStatus::Read
|
||||
&& !retry
|
||||
{
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Unhandled error: {}", err);
|
||||
item.flags.insert(
|
||||
EntryFlags::EOF | EntryFlags::FINISHED,
|
||||
EntryFlags::EOF
|
||||
| EntryFlags::ERROR
|
||||
| EntryFlags::WRITE_DONE,
|
||||
);
|
||||
} else {
|
||||
item.flags.insert(EntryFlags::EOF);
|
||||
}
|
||||
not_ready = false;
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
if item.payload.need_read() == PayloadStatus::Read
|
||||
&& !retry
|
||||
{
|
||||
continue;
|
||||
item.stream.reset(Reason::INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Unhandled error: {}", err);
|
||||
item.flags.insert(
|
||||
EntryFlags::EOF
|
||||
| EntryFlags::ERROR
|
||||
| EntryFlags::WRITE_DONE,
|
||||
);
|
||||
item.stream.reset(Reason::INTERNAL_ERROR);
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if item.flags.contains(EntryFlags::EOF) && !item.flags.contains(EntryFlags::FINISHED) {
|
||||
|
||||
if item.flags.contains(EntryFlags::EOF)
|
||||
&& !item.flags.contains(EntryFlags::FINISHED)
|
||||
{
|
||||
match item.task.poll_completed() {
|
||||
Ok(Async::NotReady) => (),
|
||||
Ok(Async::Ready(_)) => {
|
||||
item.flags.insert(EntryFlags::FINISHED | EntryFlags::WRITE_DONE);
|
||||
item.flags.insert(
|
||||
EntryFlags::FINISHED | EntryFlags::WRITE_DONE,
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
item.flags.insert(
|
||||
|
@ -250,9 +250,7 @@ impl<H: 'static> Writer for H2Writer<H> {
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -403,19 +403,24 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<H: IntoHttpHandler> Into<(Box<Service>, Vec<(Token, net::TcpListener)>)> for HttpServer<H> {
|
||||
impl<H: IntoHttpHandler> Into<(Box<Service>, Vec<(Token, net::TcpListener)>)>
|
||||
for HttpServer<H>
|
||||
{
|
||||
fn into(mut self) -> (Box<Service>, Vec<(Token, net::TcpListener)>) {
|
||||
let sockets: Vec<_> = mem::replace(&mut self.sockets, Vec::new())
|
||||
.into_iter()
|
||||
.map(|item| (item.token, item.lst))
|
||||
.collect();
|
||||
|
||||
(Box::new(HttpService {
|
||||
factory: self.factory,
|
||||
host: self.host,
|
||||
keep_alive: self.keep_alive,
|
||||
handlers: self.handlers,
|
||||
}), sockets)
|
||||
(
|
||||
Box::new(HttpService {
|
||||
factory: self.factory,
|
||||
host: self.host,
|
||||
keep_alive: self.keep_alive,
|
||||
handlers: self.handlers,
|
||||
}),
|
||||
sockets,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -125,12 +125,12 @@ mod h1writer;
|
||||
mod h2;
|
||||
mod h2writer;
|
||||
pub(crate) mod helpers;
|
||||
mod http;
|
||||
pub(crate) mod input;
|
||||
pub(crate) mod message;
|
||||
pub(crate) mod output;
|
||||
mod server;
|
||||
pub(crate) mod settings;
|
||||
mod http;
|
||||
mod ssl;
|
||||
mod worker;
|
||||
|
||||
@ -138,12 +138,12 @@ use actix::Message;
|
||||
|
||||
pub use self::message::Request;
|
||||
|
||||
pub use self::http::HttpServer;
|
||||
#[doc(hidden)]
|
||||
pub use self::server::{
|
||||
ConnectionRateTag, ConnectionTag, Connections, Server, Service, ServiceHandler,
|
||||
};
|
||||
pub use self::settings::ServerSettings;
|
||||
pub use self::http::HttpServer;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use self::ssl::*;
|
||||
|
@ -273,10 +273,9 @@ impl Output {
|
||||
|
||||
let enc = match encoding {
|
||||
#[cfg(feature = "flate2")]
|
||||
ContentEncoding::Deflate => ContentEncoder::Deflate(ZlibEncoder::new(
|
||||
transfer,
|
||||
Compression::fast(),
|
||||
)),
|
||||
ContentEncoding::Deflate => {
|
||||
ContentEncoder::Deflate(ZlibEncoder::new(transfer, Compression::fast()))
|
||||
}
|
||||
#[cfg(feature = "flate2")]
|
||||
ContentEncoding::Gzip => {
|
||||
ContentEncoder::Gzip(GzEncoder::new(transfer, Compression::fast()))
|
||||
|
@ -1,16 +1,21 @@
|
||||
use std::{mem, net};
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
|
||||
use std::{mem, net};
|
||||
|
||||
use num_cpus;
|
||||
use futures::{Future, Stream, Sink};
|
||||
use futures::sync::{mpsc, mpsc::unbounded};
|
||||
use futures::{Future, Sink, Stream};
|
||||
use num_cpus;
|
||||
|
||||
use actix::{fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext,
|
||||
Context, Handler, Response, System, StreamHandler, WrapFuture};
|
||||
use actix::{
|
||||
fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
|
||||
Response, StreamHandler, System, WrapFuture,
|
||||
};
|
||||
|
||||
use super::accept::{AcceptLoop, AcceptNotify, Command};
|
||||
use super::worker::{StopWorker, Worker, WorkerClient, Conn};
|
||||
use super::worker::{Conn, StopWorker, Worker, WorkerClient};
|
||||
use super::{PauseServer, ResumeServer, StopServer, Token};
|
||||
|
||||
#[doc(hidden)]
|
||||
@ -39,7 +44,9 @@ impl Service for Box<Service> {
|
||||
/// TCP connections.
|
||||
pub trait ServiceHandler {
|
||||
/// Handle incoming stream
|
||||
fn handle(&mut self, token: Token, io: net::TcpStream, peer: Option<net::SocketAddr>);
|
||||
fn handle(
|
||||
&mut self, token: Token, io: net::TcpStream, peer: Option<net::SocketAddr>,
|
||||
);
|
||||
|
||||
/// Shutdown open handlers
|
||||
fn shutdown(&self, _: bool) {}
|
||||
@ -156,7 +163,7 @@ impl Server {
|
||||
/// Add new service to server
|
||||
pub fn service<T>(mut self, srv: T) -> Self
|
||||
where
|
||||
T: Into<(Box<Service>, Vec<(Token, net::TcpListener)>)>
|
||||
T: Into<(Box<Service>, Vec<(Token, net::TcpListener)>)>,
|
||||
{
|
||||
let (srv, sockets) = srv.into();
|
||||
self.services.push(srv);
|
||||
@ -213,8 +220,9 @@ impl Server {
|
||||
info!("Starting server on http://{:?}", s.1.local_addr().ok());
|
||||
}
|
||||
}
|
||||
let rx = self.accept.start(
|
||||
mem::replace(&mut self.sockets, Vec::new()), workers);
|
||||
let rx = self
|
||||
.accept
|
||||
.start(mem::replace(&mut self.sockets, Vec::new()), workers);
|
||||
|
||||
// start http server actor
|
||||
let signals = self.subscribe_to_signals();
|
||||
@ -242,7 +250,9 @@ impl Server {
|
||||
}
|
||||
}
|
||||
|
||||
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr<Worker>, WorkerClient) {
|
||||
fn start_worker(
|
||||
&self, idx: usize, notify: AcceptNotify,
|
||||
) -> (Addr<Worker>, WorkerClient) {
|
||||
let (tx, rx) = unbounded::<Conn<net::TcpStream>>();
|
||||
let conns = Connections::new(notify, self.maxconn, self.maxconnrate);
|
||||
let worker = WorkerClient::new(idx, tx, conns.clone());
|
||||
@ -250,7 +260,10 @@ impl Server {
|
||||
|
||||
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
|
||||
ctx.add_message_stream(rx);
|
||||
let handlers: Vec<_> = services.into_iter().map(|s| s.create(conns.clone())).collect();
|
||||
let handlers: Vec<_> = services
|
||||
.into_iter()
|
||||
.map(|s| s.create(conns.clone()))
|
||||
.collect();
|
||||
Worker::new(conns, handlers)
|
||||
});
|
||||
|
||||
@ -258,8 +271,7 @@ impl Server {
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for Server
|
||||
{
|
||||
impl Actor for Server {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
@ -391,7 +403,8 @@ impl StreamHandler<ServerCommand, ()> for Server {
|
||||
break;
|
||||
}
|
||||
|
||||
let (addr, worker) = self.start_worker(new_idx, self.accept.get_notify());
|
||||
let (addr, worker) =
|
||||
self.start_worker(new_idx, self.accept.get_notify());
|
||||
self.workers.push((new_idx, addr));
|
||||
self.accept.send(Command::Worker(worker));
|
||||
}
|
||||
@ -413,14 +426,15 @@ impl Connections {
|
||||
0
|
||||
};
|
||||
|
||||
Connections (
|
||||
Arc::new(ConnectionsInner {
|
||||
notify,
|
||||
maxconn, maxconnrate,
|
||||
maxconn_low, maxconnrate_low,
|
||||
conn: AtomicUsize::new(0),
|
||||
connrate: AtomicUsize::new(0),
|
||||
}))
|
||||
Connections(Arc::new(ConnectionsInner {
|
||||
notify,
|
||||
maxconn,
|
||||
maxconnrate,
|
||||
maxconn_low,
|
||||
maxconnrate_low,
|
||||
conn: AtomicUsize::new(0),
|
||||
connrate: AtomicUsize::new(0),
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) fn available(&self) -> bool {
|
||||
@ -473,7 +487,6 @@ impl ConnectionsInner {
|
||||
self.notify.notify();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Type responsible for max connection stat.
|
||||
@ -498,7 +511,7 @@ impl Drop for ConnectionTag {
|
||||
/// Type responsible for max connection rate stat.
|
||||
///
|
||||
/// Max connections rate stat get updated on drop.
|
||||
pub struct ConnectionRateTag (Arc<ConnectionsInner>);
|
||||
pub struct ConnectionRateTag(Arc<ConnectionsInner>);
|
||||
|
||||
impl ConnectionRateTag {
|
||||
fn new(inner: Arc<ConnectionsInner>) -> Self {
|
||||
|
@ -6,7 +6,7 @@ pub use self::openssl::OpensslAcceptor;
|
||||
#[cfg(feature = "tls")]
|
||||
mod nativetls;
|
||||
#[cfg(feature = "tls")]
|
||||
pub use self::nativetls::{TlsStream, NativeTlsAcceptor};
|
||||
pub use self::nativetls::{NativeTlsAcceptor, TlsStream};
|
||||
|
||||
#[cfg(feature = "rust-tls")]
|
||||
mod rustls;
|
||||
|
@ -2,7 +2,7 @@ use std::net::Shutdown;
|
||||
use std::{io, time};
|
||||
|
||||
use futures::{Async, Future, Poll};
|
||||
use native_tls::{self, TlsAcceptor, HandshakeError};
|
||||
use native_tls::{self, HandshakeError, TlsAcceptor};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use server::{AcceptorService, IoStream};
|
||||
@ -29,14 +29,16 @@ pub struct TlsStream<S> {
|
||||
|
||||
/// Future returned from `NativeTlsAcceptor::accept` which will resolve
|
||||
/// once the accept handshake has finished.
|
||||
pub struct Accept<S>{
|
||||
pub struct Accept<S> {
|
||||
inner: Option<Result<native_tls::TlsStream<S>, HandshakeError<S>>>,
|
||||
}
|
||||
|
||||
impl NativeTlsAcceptor {
|
||||
/// Create `NativeTlsAcceptor` instance
|
||||
pub fn new(acceptor: TlsAcceptor) -> Self {
|
||||
NativeTlsAcceptor { acceptor: acceptor.into() }
|
||||
NativeTlsAcceptor {
|
||||
acceptor: acceptor.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,7 +51,9 @@ impl<Io: IoStream> AcceptorService<Io> for NativeTlsAcceptor {
|
||||
}
|
||||
|
||||
fn accept(&self, io: Io) -> Self::Future {
|
||||
Accept { inner: Some(self.acceptor.accept(io)) }
|
||||
Accept {
|
||||
inner: Some(self.acceptor.accept(io)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -78,18 +82,19 @@ impl<Io: IoStream> Future for Accept<Io> {
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.inner.take().expect("cannot poll MidHandshake twice") {
|
||||
Ok(stream) => Ok(TlsStream { inner: stream }.into()),
|
||||
Err(HandshakeError::Failure(e)) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||
Err(HandshakeError::WouldBlock(s)) => {
|
||||
match s.handshake() {
|
||||
Ok(stream) => Ok(TlsStream { inner: stream }.into()),
|
||||
Err(HandshakeError::Failure(e)) =>
|
||||
Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||
Err(HandshakeError::WouldBlock(s)) => {
|
||||
self.inner = Some(Err(HandshakeError::WouldBlock(s)));
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
Err(HandshakeError::Failure(e)) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
Err(HandshakeError::WouldBlock(s)) => match s.handshake() {
|
||||
Ok(stream) => Ok(TlsStream { inner: stream }.into()),
|
||||
Err(HandshakeError::Failure(e)) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
Err(HandshakeError::WouldBlock(s)) => {
|
||||
self.inner = Some(Err(HandshakeError::WouldBlock(s)));
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -124,9 +129,7 @@ impl<S: io::Read + io::Write> io::Write for TlsStream<S> {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite> AsyncRead for TlsStream<S> {
|
||||
}
|
||||
impl<S: AsyncRead + AsyncWrite> AsyncRead for TlsStream<S> {}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite> AsyncWrite for TlsStream<S> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
@ -137,4 +140,4 @@ impl<S: AsyncRead + AsyncWrite> AsyncWrite for TlsStream<S> {
|
||||
}
|
||||
self.inner.get_mut().shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user