From 8fe7ce533c2b427cdfbd21882ec35982a2f50960 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 6 Dec 2018 14:04:42 -0800 Subject: [PATCH] convert to edition 2018 --- Cargo.toml | 1 + examples/basic.rs | 3 ++- examples/ssl.rs | 3 ++- src/codec/framed_read.rs | 13 ++++----- src/codec/framed_write.rs | 12 +++++---- src/connector.rs | 14 +++++----- src/either.rs | 2 +- src/framed.rs | 34 +++++++++++------------- src/inflight.rs | 2 +- src/lib.rs | 49 +++------------------------------- src/server/accept.rs | 3 ++- src/server/config.rs | 7 ++--- src/server/server.rs | 5 ++-- src/server/services.rs | 5 ++-- src/server/worker.rs | 55 +++++++++++++++++++-------------------- src/service/and_then.rs | 6 ++--- src/service/apply.rs | 2 +- src/service/then.rs | 6 ++--- src/stream.rs | 6 +++++ src/time.rs | 2 +- src/timeout.rs | 9 +++---- 21 files changed, 103 insertions(+), 136 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2acf028c..3194b6ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ documentation = "https://docs.rs/actix-net/" categories = ["network-programming", "asynchronous"] license = "MIT/Apache-2.0" exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] +edition = "2018" [package.metadata.docs.rs] features = ["ssl", "tls", "rust-tls"] diff --git a/examples/basic.rs b/examples/basic.rs index 8162fcc6..f31b6d76 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -80,7 +80,8 @@ fn main() { future::ok(()) }) }, - ).unwrap() + ) + .unwrap() .start(); sys.run(); diff --git a/examples/ssl.rs b/examples/ssl.rs index 1d43f24d..62e3aac7 100644 --- a/examples/ssl.rs +++ b/examples/ssl.rs @@ -61,7 +61,8 @@ fn main() { println!("got ssl connection {:?}", num); future::ok(()) }) - }).unwrap() + }) + .unwrap() .start(); sys.run(); diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index 3813d85e..f54b2c23 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -1,7 +1,8 @@ use std::fmt; use bytes::BytesMut; -use futures::{Async, Poll, Sink, StartSend, Stream}; +use futures::{try_ready, Async, Poll, Sink, StartSend, Stream}; +use log::trace; use tokio_codec::Decoder; use tokio_io::AsyncRead; @@ -133,7 +134,7 @@ where pub fn framed_read2(inner: T) -> FramedRead2 { FramedRead2 { - inner: inner, + inner, eof: false, is_readable: false, buffer: BytesMut::with_capacity(INITIAL_CAPACITY), @@ -146,9 +147,9 @@ pub fn framed_read2_with_buffer(inner: T, mut buf: BytesMut) -> FramedRead2 0, + is_readable: !buf.is_empty(), buffer: buf, } } @@ -187,13 +188,13 @@ where // readable again, at which point the stream is terminated. if self.is_readable { if self.eof { - let frame = try!(self.inner.decode_eof(&mut self.buffer)); + let frame = self.inner.decode_eof(&mut self.buffer)?; return Ok(Async::Ready(frame)); } trace!("attempting to decode a frame"); - if let Some(frame) = try!(self.inner.decode(&mut self.buffer)) { + if let Some(frame) = self.inner.decode(&mut self.buffer)? { trace!("frame decoded from buffer"); return Ok(Async::Ready(Some(frame))); } diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 8d4fe15f..6b5c2b19 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -2,7 +2,8 @@ use std::fmt; use std::io::{self, Read}; use bytes::BytesMut; -use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; +use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream}; +use log::trace; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -111,7 +112,7 @@ where } fn close(&mut self) -> Poll<(), Self::SinkError> { - Ok(try!(self.inner.close())) + Ok(self.inner.close()?) } } @@ -254,7 +255,8 @@ where io::ErrorKind::WriteZero, "failed to \ write frame to transport", - ).into()); + ) + .into()); } // TODO: Add a way to `bytes` to do this w/o returning the drained @@ -266,12 +268,12 @@ where try_ready!(self.inner.poll_flush()); trace!("framed transport flushed"); - return Ok(Async::Ready(())); + Ok(Async::Ready(())) } fn close(&mut self) -> Poll<(), Self::SinkError> { try_ready!(self.poll_complete()); - Ok(try!(self.inner.shutdown())) + Ok(self.inner.shutdown()?) } } diff --git a/src/connector.rs b/src/connector.rs index 99c91e2d..e1ede0e7 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -5,7 +5,7 @@ use std::time::Duration; use std::{fmt, io}; use futures::future::{ok, Either, FutureResult}; -use futures::{Async, Future, Poll}; +use futures::{try_ready, Async, Future, Poll}; use tokio_tcp::{ConnectFuture, TcpStream}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; @@ -119,8 +119,8 @@ impl Connect { impl RequestHost for Connect { fn host(&self) -> &str { match self.kind { - ConnectKind::Host { ref host, port: _ } => host, - ConnectKind::Addr { ref host, addr: _ } => host, + ConnectKind::Host { ref host, .. } => host, + ConnectKind::Addr { ref host, .. } => host, } } } @@ -128,8 +128,8 @@ impl RequestHost for Connect { impl RequestPort for Connect { fn port(&self) -> u16 { match self.kind { - ConnectKind::Host { host: _, port } => port, - ConnectKind::Addr { host: _, addr } => addr.port(), + ConnectKind::Host { port, .. } => port, + ConnectKind::Addr { addr, .. } => addr.port(), } } } @@ -206,11 +206,11 @@ impl Service for Connector { fn call(&mut self, req: Connect) -> Self::Future { match req.kind { - ConnectKind::Host { host: _, port: _ } => Either::A(ConnectorFuture { + ConnectKind::Host { .. } => Either::A(ConnectorFuture { fut: self.resolver.call(req), fut2: None, }), - ConnectKind::Addr { host: _, addr } => { + ConnectKind::Addr { addr, .. } => { let mut addrs = VecDeque::new(); addrs.push_back(addr.ip()); Either::B(ConnectorTcpFuture { diff --git a/src/either.rs b/src/either.rs index 677ccf45..d1baab0c 100644 --- a/src/either.rs +++ b/src/either.rs @@ -1,5 +1,5 @@ //! Contains `Either` service and related types and functions. -use futures::{future, Async, Future, Poll}; +use futures::{future, try_ready, Async, Future, Poll}; use super::service::{NewService, Service}; diff --git a/src/framed.rs b/src/framed.rs index 9ea8f71f..b66a5dc0 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -9,8 +9,8 @@ use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; -use codec::Framed; -use service::{IntoNewService, IntoService, NewService, Service}; +use crate::codec::Framed; +use crate::service::{IntoNewService, IntoService, NewService, Service}; type Request = ::Item; type Response = ::Item; @@ -300,10 +300,10 @@ where } } } - Ok(Async::NotReady) => return false, + Ok(Async::NotReady) => false, Err(err) => { self.state = TransportState::Error(FramedTransportError::Service(err)); - return true; + true } } } @@ -387,26 +387,22 @@ where fn poll(&mut self) -> Poll { match mem::replace(&mut self.state, TransportState::Processing) { TransportState::Processing => { - if self.poll_service() { - return self.poll(); + if self.poll_service() || self.poll_response() { + self.poll() + } else { + Ok(Async::NotReady) } - if self.poll_response() { - return self.poll(); - } - return Ok(Async::NotReady); } TransportState::Error(err) => { - if self.poll_response() { - return Err(err); + if self.poll_response() || self.flushed { + Err(err) + } else { + self.state = TransportState::Error(err); + Ok(Async::NotReady) } - if self.flushed { - return Err(err); - } - self.state = TransportState::Error(err); - return Ok(Async::NotReady); } - TransportState::EncoderError(err) => return Err(err), - TransportState::Stopping => return Ok(Async::Ready(())), + TransportState::EncoderError(err) => Err(err), + TransportState::Stopping => Ok(Async::Ready(())), } } } diff --git a/src/inflight.rs b/src/inflight.rs index acda9dfb..9e1b35f2 100644 --- a/src/inflight.rs +++ b/src/inflight.rs @@ -1,4 +1,4 @@ -use futures::{Async, Future, Poll}; +use futures::{try_ready, Async, Future, Poll}; use super::counter::{Counter, CounterGuard}; use super::service::{IntoNewService, IntoService, NewService, Service}; diff --git a/src/lib.rs b/src/lib.rs index 0fd6e917..fe16a426 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,54 +7,11 @@ //! * `rust-tls` - enables ssl support via `rustls` crate // #![warn(missing_docs)] -#![cfg_attr( - feature = "cargo-clippy", - allow( - declare_interior_mutable_const, - borrow_interior_mutable_const - ) +#![allow( + clippy::declare_interior_mutable_const, + clippy::borrow_interior_mutable_const )] -#[macro_use] -extern crate log; -extern crate bytes; -#[macro_use] -extern crate futures; -extern crate mio; -extern crate net2; -extern crate num_cpus; -extern crate slab; -extern crate tokio; -extern crate tokio_codec; -extern crate tokio_current_thread; -extern crate tokio_io; -extern crate tokio_reactor; -extern crate tokio_tcp; -extern crate tokio_timer; -extern crate tower_service; -extern crate trust_dns_resolver; - -#[allow(unused_imports)] -#[macro_use] -extern crate actix; - -#[cfg(feature = "tls")] -extern crate native_tls; - -#[cfg(feature = "ssl")] -extern crate openssl; -#[cfg(feature = "ssl")] -extern crate tokio_openssl; - -#[cfg(feature = "rust-tls")] -extern crate rustls; -#[cfg(feature = "rust-tls")] -extern crate tokio_rustls; -#[cfg(feature = "rust-tls")] -extern crate webpki; -#[cfg(feature = "rust-tls")] -extern crate webpki_roots; - mod cell; pub mod cloneable; pub mod codec; diff --git a/src/server/accept.rs b/src/server/accept.rs index 759d717e..a902b0fb 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -3,6 +3,7 @@ use std::time::{Duration, Instant}; use std::{io, net, thread}; use futures::{sync::mpsc, Future}; +use log::{error, info}; use mio; use slab::Slab; use tokio_timer::Delay; @@ -134,7 +135,7 @@ fn connection_error(e: &io::Error) -> bool { } impl Accept { - #![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] + #![allow(clippy::too_many_arguments)] pub(crate) fn start( rx: sync_mpsc::Receiver, cmd_reg: mio::Registration, diff --git a/src/server/config.rs b/src/server/config.rs index d959c302..b48f89ea 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -2,10 +2,11 @@ use std::collections::HashMap; use std::{fmt, io, net}; use futures::future::{join_all, Future}; +use log::error; use tokio_tcp::TcpStream; -use counter::CounterGuard; -use service::{IntoNewService, NewService}; +use crate::counter::CounterGuard; +use crate::service::{IntoNewService, NewService}; use super::server::bind_addr; use super::services::{ @@ -111,7 +112,7 @@ impl InternalServiceFactory for ConfiguredService { pub(super) trait ServiceRuntimeConfiguration: Send { fn clone(&self) -> Box; - fn configure(&self, &mut ServiceRuntime); + fn configure(&self, rt: &mut ServiceRuntime); } impl ServiceRuntimeConfiguration for F diff --git a/src/server/server.rs b/src/server/server.rs index e6a251a7..34605362 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -3,6 +3,7 @@ use std::{io, mem, net}; use futures::sync::{mpsc, mpsc::unbounded}; use futures::{Future, Sink, Stream}; +use log::{error, info}; use net2::TcpBuilder; use num_cpus; @@ -284,7 +285,7 @@ impl Server { self.services.iter().map(|v| v.clone_factory()).collect(); Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(move || { - Worker::start(rx1, rx2, services, avail, timeout.clone()); + Worker::start(rx1, rx2, services, avail, timeout); Ok::<_, ()>(()) })); @@ -376,7 +377,7 @@ impl Handler for Server { } if !self.workers.is_empty() { - Response::async(rx.into_future().map(|_| ()).map_err(|_| ())) + Response::r#async(rx.into_future().map(|_| ()).map_err(|_| ())) } else { // we need to stop system if server was spawned if self.exit { diff --git a/src/server/services.rs b/src/server/services.rs index 151b6200..24bde6c7 100644 --- a/src/server/services.rs +++ b/src/server/services.rs @@ -3,13 +3,14 @@ use std::time::Duration; use futures::future::{err, ok, FutureResult}; use futures::{Future, Poll}; +use log::error; use tokio_current_thread::spawn; use tokio_reactor::Handle; use tokio_tcp::TcpStream; use super::Token; -use counter::CounterGuard; -use service::{NewService, Service}; +use crate::counter::CounterGuard; +use crate::service::{NewService, Service}; /// Server message pub enum ServerMessage { diff --git a/src/server/worker.rs b/src/server/worker.rs index faada4f3..37682a8b 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -5,6 +5,7 @@ use std::{mem, net, time}; use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::sync::oneshot; use futures::{future, Async, Future, Poll, Stream}; +use log::{error, info, trace}; use tokio_current_thread::spawn; use tokio_timer::{sleep, Delay}; @@ -14,7 +15,7 @@ use actix::{Arbiter, Message}; use super::accept::AcceptNotify; use super::services::{BoxedServerService, InternalServiceFactory, ServerMessage}; use super::Token; -use counter::Counter; +use crate::counter::Counter; pub(crate) struct WorkerCommand(Conn); @@ -167,7 +168,8 @@ impl Worker { .map_err(|e| { error!("Can not start worker: {:?}", e); Arbiter::current().do_send(StopArbiter(0)); - }).and_then(move |services| { + }) + .and_then(move |services| { for item in services { for (idx, token, service) in item { while token.0 >= wrk.services.len() { @@ -192,7 +194,7 @@ impl Worker { let timeout = self.shutdown_timeout; self.services.iter_mut().for_each(move |h| { if let Some(h) = h { - let _ = h.1.call((None, ServerMessage::Shutdown(timeout.clone()))); + let _ = h.1.call((None, ServerMessage::Shutdown(timeout))); } }); } @@ -249,36 +251,33 @@ impl Future for Worker { fn poll(&mut self) -> Poll { // `StopWorker` message handler - match self.rx2.poll() { - Ok(Async::Ready(Some(StopCommand { graceful, result }))) => { - self.availability.set(false); + if let Ok(Async::Ready(Some(StopCommand { graceful, result }))) = self.rx2.poll() { + self.availability.set(false); + let num = num_connections(); + if num == 0 { + info!("Shutting down worker, 0 connections"); + let _ = result.send(true); + return Ok(Async::Ready(())); + } else if graceful { + self.shutdown(false); let num = num_connections(); - if num == 0 { - info!("Shutting down worker, 0 connections"); + if num != 0 { + info!("Graceful worker shutdown, {} connections", num); + self.state = WorkerState::Shutdown( + sleep(time::Duration::from_secs(1)), + sleep(self.shutdown_timeout), + result, + ); + } else { let _ = result.send(true); return Ok(Async::Ready(())); - } else if graceful { - self.shutdown(false); - let num = num_connections(); - if num != 0 { - info!("Graceful worker shutdown, {} connections", num); - self.state = WorkerState::Shutdown( - sleep(time::Duration::from_secs(1)), - sleep(self.shutdown_timeout), - result, - ); - } else { - let _ = result.send(true); - return Ok(Async::Ready(())); - } - } else { - info!("Force shutdown worker, {} connections", num); - self.shutdown(true); - let _ = result.send(false); - return Ok(Async::Ready(())); } + } else { + info!("Force shutdown worker, {} connections", num); + self.shutdown(true); + let _ = result.send(false); + return Ok(Async::Ready(())); } - _ => (), } let state = mem::replace(&mut self.state, WorkerState::None); diff --git a/src/service/and_then.rs b/src/service/and_then.rs index 0251f80b..dde1aae5 100644 --- a/src/service/and_then.rs +++ b/src/service/and_then.rs @@ -1,7 +1,7 @@ -use futures::{Async, Future, Poll}; +use futures::{try_ready, Async, Future, Poll}; use super::{IntoNewService, NewService, Service}; -use cell::Cell; +use crate::cell::Cell; /// Service for the `and_then` combinator, chaining a computation onto the end /// of another service which completes successfully. @@ -45,7 +45,7 @@ where type Future = AndThenFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let _ = try_ready!(self.a.poll_ready()); + try_ready!(self.a.poll_ready()); self.b.borrow_mut().poll_ready() } diff --git a/src/service/apply.rs b/src/service/apply.rs index 2db93103..b9ccc781 100644 --- a/src/service/apply.rs +++ b/src/service/apply.rs @@ -55,7 +55,7 @@ where type Future = Out::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready().map_err(|e| e.into()) + self.service.poll_ready() } fn call(&mut self, req: In) -> Self::Future { diff --git a/src/service/then.rs b/src/service/then.rs index 313ab725..38151a16 100644 --- a/src/service/then.rs +++ b/src/service/then.rs @@ -1,7 +1,7 @@ -use futures::{Async, Future, Poll}; +use futures::{try_ready, Async, Future, Poll}; use super::{IntoNewService, NewService, Service}; -use cell::Cell; +use crate::cell::Cell; /// Service for the `then` combinator, chaining a computation onto the end of /// another service. @@ -45,7 +45,7 @@ where type Future = ThenFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let _ = try_ready!(self.a.poll_ready()); + try_ready!(self.a.poll_ready()); self.b.borrow_mut().poll_ready() } diff --git a/src/stream.rs b/src/stream.rs index 3f8948ca..c8067807 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -106,6 +106,12 @@ impl TakeItem { } } +impl Default for TakeItem { + fn default() -> Self { + TakeItem { _t: PhantomData } + } +} + impl Clone for TakeItem { fn clone(&self) -> TakeItem { TakeItem { _t: PhantomData } diff --git a/src/time.rs b/src/time.rs index b1ca52fa..94f5f626 100644 --- a/src/time.rs +++ b/src/time.rs @@ -66,7 +66,7 @@ impl LowResTimeService { /// Get current time. This function has to be called from /// future's poll method, otherwise it panics. pub fn now(&self) -> Instant { - let cur = self.0.borrow().current.clone(); + let cur = self.0.borrow().current; if let Some(cur) = cur { cur } else { diff --git a/src/timeout.rs b/src/timeout.rs index ee7053dc..be5968ad 100644 --- a/src/timeout.rs +++ b/src/timeout.rs @@ -5,10 +5,11 @@ use std::fmt; use std::time::Duration; +use futures::try_ready; use futures::{Async, Future, Poll}; use tokio_timer::{clock, Delay}; -use service::{NewService, Service}; +use crate::service::{NewService, Service}; /// Applies a timeout to requests. #[derive(Debug)] @@ -56,7 +57,7 @@ where fn new_service(&self) -> Self::Future { TimeoutFut { fut: self.inner.new_service(), - timeout: self.timeout.clone(), + timeout: self.timeout, } } } @@ -115,9 +116,7 @@ where type Future = TimeoutServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner - .poll_ready() - .map_err(|e| TimeoutError::Service(e)) + self.inner.poll_ready().map_err(TimeoutError::Service) } fn call(&mut self, request: Request) -> Self::Future {