mirror of
https://github.com/fafhrd91/actix-net
synced 2025-06-29 01:50:35 +02:00
initial import
This commit is contained in:
468
src/accept.rs
Normal file
468
src/accept.rs
Normal file
@ -0,0 +1,468 @@
|
||||
use std::sync::mpsc as sync_mpsc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{io, net, thread};
|
||||
|
||||
use futures::{sync::mpsc, Future};
|
||||
use mio;
|
||||
use slab::Slab;
|
||||
use tokio_timer::Delay;
|
||||
|
||||
use actix::{msgs::Execute, Arbiter, System};
|
||||
|
||||
use super::server::ServerCommand;
|
||||
use super::worker::{Conn, WorkerClient};
|
||||
use super::Token;
|
||||
|
||||
pub(crate) enum Command {
|
||||
Pause,
|
||||
Resume,
|
||||
Stop,
|
||||
Worker(WorkerClient),
|
||||
}
|
||||
|
||||
struct ServerSocketInfo {
|
||||
addr: net::SocketAddr,
|
||||
token: Token,
|
||||
handler: Token,
|
||||
sock: mio::net::TcpListener,
|
||||
timeout: Option<Instant>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AcceptNotify(mio::SetReadiness);
|
||||
|
||||
impl AcceptNotify {
|
||||
pub(crate) fn new(ready: mio::SetReadiness) -> Self {
|
||||
AcceptNotify(ready)
|
||||
}
|
||||
|
||||
pub(crate) fn notify(&self) {
|
||||
let _ = self.0.set_readiness(mio::Ready::readable());
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AcceptNotify {
|
||||
fn default() -> Self {
|
||||
AcceptNotify::new(mio::Registration::new2().1)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AcceptLoop {
|
||||
cmd_reg: Option<mio::Registration>,
|
||||
cmd_ready: mio::SetReadiness,
|
||||
notify_reg: Option<mio::Registration>,
|
||||
notify_ready: mio::SetReadiness,
|
||||
tx: sync_mpsc::Sender<Command>,
|
||||
rx: Option<sync_mpsc::Receiver<Command>>,
|
||||
srv: Option<(
|
||||
mpsc::UnboundedSender<ServerCommand>,
|
||||
mpsc::UnboundedReceiver<ServerCommand>,
|
||||
)>,
|
||||
}
|
||||
|
||||
impl AcceptLoop {
|
||||
pub fn new() -> AcceptLoop {
|
||||
let (tx, rx) = sync_mpsc::channel();
|
||||
let (cmd_reg, cmd_ready) = mio::Registration::new2();
|
||||
let (notify_reg, notify_ready) = mio::Registration::new2();
|
||||
|
||||
AcceptLoop {
|
||||
tx,
|
||||
cmd_ready,
|
||||
cmd_reg: Some(cmd_reg),
|
||||
notify_ready,
|
||||
notify_reg: Some(notify_reg),
|
||||
rx: Some(rx),
|
||||
srv: Some(mpsc::unbounded()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: Command) {
|
||||
let _ = self.tx.send(msg);
|
||||
let _ = self.cmd_ready.set_readiness(mio::Ready::readable());
|
||||
}
|
||||
|
||||
pub fn get_notify(&self) -> AcceptNotify {
|
||||
AcceptNotify::new(self.notify_ready.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn start(
|
||||
&mut self, socks: Vec<(Token, net::TcpListener)>, workers: Vec<WorkerClient>,
|
||||
) -> mpsc::UnboundedReceiver<ServerCommand> {
|
||||
let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo");
|
||||
|
||||
Accept::start(
|
||||
self.rx.take().expect("Can not re-use AcceptInfo"),
|
||||
self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
|
||||
self.notify_reg.take().expect("Can not re-use AcceptInfo"),
|
||||
socks,
|
||||
tx,
|
||||
workers,
|
||||
);
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
||||
struct Accept {
|
||||
poll: mio::Poll,
|
||||
rx: sync_mpsc::Receiver<Command>,
|
||||
sockets: Slab<ServerSocketInfo>,
|
||||
workers: Vec<WorkerClient>,
|
||||
srv: mpsc::UnboundedSender<ServerCommand>,
|
||||
timer: (mio::Registration, mio::SetReadiness),
|
||||
next: usize,
|
||||
backpressure: bool,
|
||||
}
|
||||
|
||||
const DELTA: usize = 100;
|
||||
const CMD: mio::Token = mio::Token(0);
|
||||
const TIMER: mio::Token = mio::Token(1);
|
||||
const NOTIFY: mio::Token = mio::Token(2);
|
||||
|
||||
/// This function defines errors that are per-connection. Which basically
|
||||
/// means that if we get this error from `accept()` system call it means
|
||||
/// next connection might be ready to be accepted.
|
||||
///
|
||||
/// All other errors will incur a timeout before next `accept()` is performed.
|
||||
/// The timeout is useful to handle resource exhaustion errors like ENFILE
|
||||
/// and EMFILE. Otherwise, could enter into tight loop.
|
||||
fn connection_error(e: &io::Error) -> bool {
|
||||
e.kind() == io::ErrorKind::ConnectionRefused
|
||||
|| e.kind() == io::ErrorKind::ConnectionAborted
|
||||
|| e.kind() == io::ErrorKind::ConnectionReset
|
||||
}
|
||||
|
||||
impl Accept {
|
||||
#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
pub(crate) fn start(
|
||||
rx: sync_mpsc::Receiver<Command>, cmd_reg: mio::Registration,
|
||||
notify_reg: mio::Registration, socks: Vec<(Token, net::TcpListener)>,
|
||||
srv: mpsc::UnboundedSender<ServerCommand>, workers: Vec<WorkerClient>,
|
||||
) {
|
||||
let sys = System::current();
|
||||
|
||||
// start accept thread
|
||||
let _ = thread::Builder::new()
|
||||
.name("actix-web accept loop".to_owned())
|
||||
.spawn(move || {
|
||||
System::set_current(sys);
|
||||
let mut accept = Accept::new(rx, socks, workers, srv);
|
||||
|
||||
// Start listening for incoming commands
|
||||
if let Err(err) = accept.poll.register(
|
||||
&cmd_reg,
|
||||
CMD,
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
panic!("Can not register Registration: {}", err);
|
||||
}
|
||||
|
||||
// Start listening for notify updates
|
||||
if let Err(err) = accept.poll.register(
|
||||
¬ify_reg,
|
||||
NOTIFY,
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
panic!("Can not register Registration: {}", err);
|
||||
}
|
||||
|
||||
accept.poll();
|
||||
});
|
||||
}
|
||||
|
||||
fn new(
|
||||
rx: sync_mpsc::Receiver<Command>, socks: Vec<(Token, net::TcpListener)>,
|
||||
workers: Vec<WorkerClient>, srv: mpsc::UnboundedSender<ServerCommand>,
|
||||
) -> Accept {
|
||||
// Create a poll instance
|
||||
let poll = match mio::Poll::new() {
|
||||
Ok(poll) => poll,
|
||||
Err(err) => panic!("Can not create mio::Poll: {}", err),
|
||||
};
|
||||
|
||||
// Start accept
|
||||
let mut sockets = Slab::new();
|
||||
for (idx, (hnd_token, lst)) in socks.into_iter().enumerate() {
|
||||
let addr = lst.local_addr().unwrap();
|
||||
let server = mio::net::TcpListener::from_std(lst)
|
||||
.expect("Can not create mio::net::TcpListener");
|
||||
|
||||
let entry = sockets.vacant_entry();
|
||||
let token = entry.key();
|
||||
|
||||
// Start listening for incoming connections
|
||||
if let Err(err) = poll.register(
|
||||
&server,
|
||||
mio::Token(token + DELTA),
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
panic!("Can not register io: {}", err);
|
||||
}
|
||||
|
||||
entry.insert(ServerSocketInfo {
|
||||
addr,
|
||||
token: hnd_token,
|
||||
handler: Token(idx),
|
||||
sock: server,
|
||||
timeout: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Timer
|
||||
let (tm, tmr) = mio::Registration::new2();
|
||||
if let Err(err) =
|
||||
poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
|
||||
{
|
||||
panic!("Can not register Registration: {}", err);
|
||||
}
|
||||
|
||||
Accept {
|
||||
poll,
|
||||
rx,
|
||||
sockets,
|
||||
workers,
|
||||
srv,
|
||||
next: 0,
|
||||
timer: (tm, tmr),
|
||||
backpressure: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(&mut self) {
|
||||
// Create storage for events
|
||||
let mut events = mio::Events::with_capacity(128);
|
||||
|
||||
loop {
|
||||
if let Err(err) = self.poll.poll(&mut events, None) {
|
||||
panic!("Poll error: {}", err);
|
||||
}
|
||||
|
||||
for event in events.iter() {
|
||||
let token = event.token();
|
||||
match token {
|
||||
CMD => if !self.process_cmd() {
|
||||
return;
|
||||
},
|
||||
TIMER => self.process_timer(),
|
||||
NOTIFY => self.backpressure(false),
|
||||
_ => {
|
||||
let token = usize::from(token);
|
||||
if token < DELTA {
|
||||
continue;
|
||||
}
|
||||
self.accept(token - DELTA);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_timer(&mut self) {
|
||||
let now = Instant::now();
|
||||
for (token, info) in self.sockets.iter_mut() {
|
||||
if let Some(inst) = info.timeout.take() {
|
||||
if now > inst {
|
||||
if let Err(err) = self.poll.register(
|
||||
&info.sock,
|
||||
mio::Token(token + DELTA),
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
error!("Can not register server socket {}", err);
|
||||
} else {
|
||||
info!("Resume accepting connections on {}", info.addr);
|
||||
}
|
||||
} else {
|
||||
info.timeout = Some(inst);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_cmd(&mut self) -> bool {
|
||||
loop {
|
||||
match self.rx.try_recv() {
|
||||
Ok(cmd) => match cmd {
|
||||
Command::Pause => {
|
||||
for (_, info) in self.sockets.iter_mut() {
|
||||
if let Err(err) = self.poll.deregister(&info.sock) {
|
||||
error!("Can not deregister server socket {}", err);
|
||||
} else {
|
||||
info!("Paused accepting connections on {}", info.addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
Command::Resume => {
|
||||
for (token, info) in self.sockets.iter() {
|
||||
if let Err(err) = self.poll.register(
|
||||
&info.sock,
|
||||
mio::Token(token + DELTA),
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
error!("Can not resume socket accept process: {}", err);
|
||||
} else {
|
||||
info!(
|
||||
"Accepting connections on {} has been resumed",
|
||||
info.addr
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Command::Stop => {
|
||||
for (_, info) in self.sockets.iter() {
|
||||
let _ = self.poll.deregister(&info.sock);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
Command::Worker(worker) => {
|
||||
self.backpressure(false);
|
||||
self.workers.push(worker);
|
||||
}
|
||||
},
|
||||
Err(err) => match err {
|
||||
sync_mpsc::TryRecvError::Empty => break,
|
||||
sync_mpsc::TryRecvError::Disconnected => {
|
||||
for (_, info) in self.sockets.iter() {
|
||||
let _ = self.poll.deregister(&info.sock);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
fn backpressure(&mut self, on: bool) {
|
||||
if self.backpressure {
|
||||
if !on {
|
||||
self.backpressure = false;
|
||||
for (token, info) in self.sockets.iter() {
|
||||
if let Err(err) = self.poll.register(
|
||||
&info.sock,
|
||||
mio::Token(token + DELTA),
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
error!("Can not resume socket accept process: {}", err);
|
||||
} else {
|
||||
info!("Accepting connections on {} has been resumed", info.addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if on {
|
||||
self.backpressure = true;
|
||||
for (_, info) in self.sockets.iter() {
|
||||
let _ = self.poll.deregister(&info.sock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn accept_one(&mut self, mut msg: Conn) {
|
||||
if self.backpressure {
|
||||
while !self.workers.is_empty() {
|
||||
match self.workers[self.next].send(msg) {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
|
||||
self.workers[self.next].idx,
|
||||
));
|
||||
msg = err.into_inner();
|
||||
self.workers.swap_remove(self.next);
|
||||
if self.workers.is_empty() {
|
||||
error!("No workers");
|
||||
return;
|
||||
} else if self.workers.len() <= self.next {
|
||||
self.next = 0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
self.next = (self.next + 1) % self.workers.len();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
let mut idx = 0;
|
||||
while idx < self.workers.len() {
|
||||
idx += 1;
|
||||
if self.workers[self.next].available() {
|
||||
match self.workers[self.next].send(msg) {
|
||||
Ok(_) => {
|
||||
self.next = (self.next + 1) % self.workers.len();
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
|
||||
self.workers[self.next].idx,
|
||||
));
|
||||
msg = err.into_inner();
|
||||
self.workers.swap_remove(self.next);
|
||||
if self.workers.is_empty() {
|
||||
error!("No workers");
|
||||
self.backpressure(true);
|
||||
return;
|
||||
} else if self.workers.len() <= self.next {
|
||||
self.next = 0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.next = (self.next + 1) % self.workers.len();
|
||||
}
|
||||
// enable backpressure
|
||||
self.backpressure(true);
|
||||
self.accept_one(msg);
|
||||
}
|
||||
}
|
||||
|
||||
fn accept(&mut self, token: usize) {
|
||||
loop {
|
||||
let msg = if let Some(info) = self.sockets.get_mut(token) {
|
||||
match info.sock.accept_std() {
|
||||
Ok((io, addr)) => Conn {
|
||||
io,
|
||||
token: info.token,
|
||||
handler: info.handler,
|
||||
peer: Some(addr),
|
||||
},
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
|
||||
Err(ref e) if connection_error(e) => continue,
|
||||
Err(e) => {
|
||||
error!("Error accepting connection: {}", e);
|
||||
if let Err(err) = self.poll.deregister(&info.sock) {
|
||||
error!("Can not deregister server socket {}", err);
|
||||
}
|
||||
|
||||
// sleep after error
|
||||
info.timeout = Some(Instant::now() + Duration::from_millis(500));
|
||||
|
||||
let r = self.timer.1.clone();
|
||||
System::current().arbiter().do_send(Execute::new(
|
||||
move || -> Result<(), ()> {
|
||||
Arbiter::spawn(
|
||||
Delay::new(Instant::now() + Duration::from_millis(510))
|
||||
.map_err(|_| ())
|
||||
.and_then(move |_| {
|
||||
let _ = r.set_readiness(mio::Ready::readable());
|
||||
Ok(())
|
||||
}),
|
||||
);
|
||||
Ok(())
|
||||
},
|
||||
));
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
|
||||
self.accept_one(msg);
|
||||
}
|
||||
}
|
||||
}
|
115
src/extensions.rs
Normal file
115
src/extensions.rs
Normal file
@ -0,0 +1,115 @@
|
||||
use std::any::{Any, TypeId};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::hash::{BuildHasherDefault, Hasher};
|
||||
|
||||
struct IdHasher {
|
||||
id: u64,
|
||||
}
|
||||
|
||||
impl Default for IdHasher {
|
||||
fn default() -> IdHasher {
|
||||
IdHasher { id: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
impl Hasher for IdHasher {
|
||||
fn write(&mut self, bytes: &[u8]) {
|
||||
for &x in bytes {
|
||||
self.id.wrapping_add(u64::from(x));
|
||||
}
|
||||
}
|
||||
|
||||
fn write_u64(&mut self, u: u64) {
|
||||
self.id = u;
|
||||
}
|
||||
|
||||
fn finish(&self) -> u64 {
|
||||
self.id
|
||||
}
|
||||
}
|
||||
|
||||
type AnyMap = HashMap<TypeId, Box<Any>, BuildHasherDefault<IdHasher>>;
|
||||
|
||||
/// A type map of request extensions.
|
||||
#[derive(Default)]
|
||||
pub struct Extensions {
|
||||
map: AnyMap,
|
||||
}
|
||||
|
||||
impl Extensions {
|
||||
/// Create an empty `Extensions`.
|
||||
#[inline]
|
||||
pub fn new() -> Extensions {
|
||||
Extensions {
|
||||
map: HashMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a type into this `Extensions`.
|
||||
///
|
||||
/// If a extension of this type already existed, it will
|
||||
/// be returned.
|
||||
pub fn insert<T: 'static>(&mut self, val: T) {
|
||||
self.map.insert(TypeId::of::<T>(), Box::new(val));
|
||||
}
|
||||
|
||||
/// Get a reference to a type previously inserted on this `Extensions`.
|
||||
pub fn get<T: 'static>(&self) -> Option<&T> {
|
||||
self.map
|
||||
.get(&TypeId::of::<T>())
|
||||
.and_then(|boxed| (&**boxed as &(Any + 'static)).downcast_ref())
|
||||
}
|
||||
|
||||
/// Get a mutable reference to a type previously inserted on this
|
||||
/// `Extensions`.
|
||||
pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
|
||||
self.map
|
||||
.get_mut(&TypeId::of::<T>())
|
||||
.and_then(|boxed| (&mut **boxed as &mut (Any + 'static)).downcast_mut())
|
||||
}
|
||||
|
||||
/// Remove a type from this `Extensions`.
|
||||
///
|
||||
/// If a extension of this type existed, it will be returned.
|
||||
pub fn remove<T: 'static>(&mut self) -> Option<T> {
|
||||
self.map.remove(&TypeId::of::<T>()).and_then(|boxed| {
|
||||
(boxed as Box<Any + 'static>)
|
||||
.downcast()
|
||||
.ok()
|
||||
.map(|boxed| *boxed)
|
||||
})
|
||||
}
|
||||
|
||||
/// Clear the `Extensions` of all inserted extensions.
|
||||
#[inline]
|
||||
pub fn clear(&mut self) {
|
||||
self.map.clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Extensions {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Extensions").finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extensions() {
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct MyType(i32);
|
||||
|
||||
let mut extensions = Extensions::new();
|
||||
|
||||
extensions.insert(5i32);
|
||||
extensions.insert(MyType(10));
|
||||
|
||||
assert_eq!(extensions.get(), Some(&5i32));
|
||||
assert_eq!(extensions.get_mut(), Some(&mut 5i32));
|
||||
|
||||
assert_eq!(extensions.remove::<i32>(), Some(5i32));
|
||||
assert!(extensions.get::<i32>().is_none());
|
||||
|
||||
assert_eq!(extensions.get::<bool>(), None);
|
||||
assert_eq!(extensions.get(), Some(&MyType(10)));
|
||||
}
|
162
src/lib.rs
Normal file
162
src/lib.rs
Normal file
@ -0,0 +1,162 @@
|
||||
//! Actix web is a small, pragmatic, and extremely fast web framework
|
||||
//! for Rust.
|
||||
//!
|
||||
//! ## Package feature
|
||||
//!
|
||||
//! * `tls` - enables ssl support via `native-tls` crate
|
||||
//! * `alpn` - enables ssl support via `openssl` crate, require for `http/2`
|
||||
//! support
|
||||
//! * `rust-tls` - enables ssl support via `rustls` crate
|
||||
//!
|
||||
|
||||
// #![warn(missing_docs)]
|
||||
// #![allow(
|
||||
// dead_code,
|
||||
// unused_variables,
|
||||
// unused_imports,
|
||||
// patterns_in_fns_without_body
|
||||
// )]
|
||||
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate byteorder;
|
||||
extern crate bytes;
|
||||
extern crate failure;
|
||||
extern crate futures;
|
||||
extern crate mio;
|
||||
extern crate net2;
|
||||
extern crate num_cpus;
|
||||
extern crate parking_lot;
|
||||
extern crate slab;
|
||||
extern crate time;
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
extern crate tokio_reactor;
|
||||
extern crate tokio_tcp;
|
||||
extern crate tokio_timer;
|
||||
extern crate tower_service;
|
||||
|
||||
#[macro_use]
|
||||
extern crate actix;
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
extern crate native_tls;
|
||||
|
||||
#[cfg(feature = "ssl")]
|
||||
extern crate openssl;
|
||||
#[cfg(feature = "ssl")]
|
||||
extern crate tokio_openssl;
|
||||
|
||||
#[cfg(feature = "rust-tls")]
|
||||
extern crate rustls;
|
||||
#[cfg(feature = "rust-tls")]
|
||||
extern crate tokio_rustls;
|
||||
#[cfg(feature = "rust-tls")]
|
||||
extern crate webpki;
|
||||
#[cfg(feature = "rust-tls")]
|
||||
extern crate webpki_roots;
|
||||
|
||||
use std::io;
|
||||
use std::net::Shutdown;
|
||||
use std::rc::Rc;
|
||||
|
||||
use actix::Message;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use futures::{Async, Poll};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
pub(crate) mod accept;
|
||||
mod extensions;
|
||||
mod server;
|
||||
mod server_service;
|
||||
pub mod service;
|
||||
pub mod ssl;
|
||||
mod worker;
|
||||
|
||||
pub use self::server::{ConnectionRateTag, ConnectionTag, Connections, Server};
|
||||
pub use service::{IntoNewService, IntoService};
|
||||
|
||||
pub use extensions::Extensions;
|
||||
|
||||
/// Pause accepting incoming connections
|
||||
///
|
||||
/// If socket contains some pending connection, they might be dropped.
|
||||
/// All opened connection remains active.
|
||||
#[derive(Message)]
|
||||
pub struct PauseServer;
|
||||
|
||||
/// Resume accepting incoming connections
|
||||
#[derive(Message)]
|
||||
pub struct ResumeServer;
|
||||
|
||||
/// Stop incoming connection processing, stop all workers and exit.
|
||||
///
|
||||
/// If server starts with `spawn()` method, then spawned thread get terminated.
|
||||
pub struct StopServer {
|
||||
/// Whether to try and shut down gracefully
|
||||
pub graceful: bool,
|
||||
}
|
||||
|
||||
impl Message for StopServer {
|
||||
type Result = Result<(), ()>;
|
||||
}
|
||||
|
||||
/// Socket id token
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct Token(usize);
|
||||
|
||||
// impl Token {
|
||||
// pub(crate) fn new(val: usize) -> Token {
|
||||
// Token(val)
|
||||
// }
|
||||
// }
|
||||
|
||||
const LW_BUFFER_SIZE: usize = 4096;
|
||||
const HW_BUFFER_SIZE: usize = 32_768;
|
||||
|
||||
#[doc(hidden)]
|
||||
/// Low-level io stream operations
|
||||
pub trait IoStream: AsyncRead + AsyncWrite + 'static {
|
||||
fn shutdown(&mut self, how: Shutdown) -> io::Result<()>;
|
||||
|
||||
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>;
|
||||
|
||||
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
|
||||
|
||||
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<bool, io::Error> {
|
||||
let mut read_some = false;
|
||||
loop {
|
||||
if buf.remaining_mut() < LW_BUFFER_SIZE {
|
||||
buf.reserve(HW_BUFFER_SIZE);
|
||||
}
|
||||
unsafe {
|
||||
match self.read(buf.bytes_mut()) {
|
||||
Ok(n) => {
|
||||
if n == 0 {
|
||||
return Ok(Async::Ready(!read_some));
|
||||
} else {
|
||||
read_some = true;
|
||||
buf.advance_mut(n);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return if e.kind() == io::ErrorKind::WouldBlock {
|
||||
if read_some {
|
||||
Ok(Async::Ready(false))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
} else {
|
||||
Err(e)
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extra io stream extensions
|
||||
fn extensions(&self) -> Option<Rc<Extensions>> {
|
||||
None
|
||||
}
|
||||
}
|
556
src/server.rs
Normal file
556
src/server.rs
Normal file
@ -0,0 +1,556 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use std::{fmt, io, mem, net};
|
||||
|
||||
use futures::sync::{mpsc, mpsc::unbounded};
|
||||
use futures::{Future, Sink, Stream};
|
||||
use net2::TcpBuilder;
|
||||
use num_cpus;
|
||||
use tokio_tcp::TcpStream;
|
||||
use tower_service::NewService;
|
||||
|
||||
use actix::{
|
||||
actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
|
||||
Response, StreamHandler, System, WrapFuture,
|
||||
};
|
||||
|
||||
use super::accept::{AcceptLoop, AcceptNotify, Command};
|
||||
use super::server_service::{ServerNewService, ServerServiceFactory};
|
||||
use super::service::IntoNewService;
|
||||
use super::worker::{Conn, StopWorker, Worker, WorkerClient};
|
||||
use super::{PauseServer, ResumeServer, StopServer, Token};
|
||||
|
||||
pub(crate) enum ServerCommand {
|
||||
WorkerDied(usize),
|
||||
}
|
||||
|
||||
/// Server
|
||||
pub struct Server {
|
||||
threads: usize,
|
||||
workers: Vec<(usize, Addr<Worker>)>,
|
||||
services: Vec<Box<ServerServiceFactory + Send>>,
|
||||
sockets: Vec<(Token, net::TcpListener)>,
|
||||
accept: AcceptLoop,
|
||||
exit: bool,
|
||||
shutdown_timeout: u16,
|
||||
signals: Option<Addr<signal::ProcessSignals>>,
|
||||
no_signals: bool,
|
||||
maxconn: usize,
|
||||
maxconnrate: usize,
|
||||
}
|
||||
|
||||
impl Default for Server {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Server {
|
||||
/// Create new Server instance
|
||||
pub fn new() -> Server {
|
||||
Server {
|
||||
threads: num_cpus::get(),
|
||||
workers: Vec::new(),
|
||||
services: Vec::new(),
|
||||
sockets: Vec::new(),
|
||||
accept: AcceptLoop::new(),
|
||||
exit: false,
|
||||
shutdown_timeout: 30,
|
||||
signals: None,
|
||||
no_signals: false,
|
||||
maxconn: 102_400,
|
||||
maxconnrate: 256,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set number of workers to start.
|
||||
///
|
||||
/// By default http server uses number of available logical cpu as threads
|
||||
/// count.
|
||||
pub fn workers(mut self, num: usize) -> Self {
|
||||
self.threads = num;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the maximum per-worker number of concurrent connections.
|
||||
///
|
||||
/// All socket listeners will stop accepting connections when this limit is
|
||||
/// reached for each worker.
|
||||
///
|
||||
/// By default max connections is set to a 100k.
|
||||
pub fn maxconn(mut self, num: usize) -> Self {
|
||||
self.maxconn = num;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the maximum per-worker concurrent connection establish process.
|
||||
///
|
||||
/// All listeners will stop accepting connections when this limit is
|
||||
/// reached. It can be used to limit the global SSL CPU usage.
|
||||
///
|
||||
/// By default max connections is set to a 256.
|
||||
pub fn maxconnrate(mut self, num: usize) -> Self {
|
||||
self.maxconnrate = num;
|
||||
self
|
||||
}
|
||||
|
||||
/// Stop actix system.
|
||||
///
|
||||
/// `SystemExit` message stops currently running system.
|
||||
pub fn system_exit(mut self) -> Self {
|
||||
self.exit = true;
|
||||
self
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
/// Set alternative address for `ProcessSignals` actor.
|
||||
pub fn signals(mut self, addr: Addr<signal::ProcessSignals>) -> Self {
|
||||
self.signals = Some(addr);
|
||||
self
|
||||
}
|
||||
|
||||
/// Disable signal handling
|
||||
pub fn disable_signals(mut self) -> Self {
|
||||
self.no_signals = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Timeout for graceful workers shutdown.
|
||||
///
|
||||
/// After receiving a stop signal, workers have this much time to finish
|
||||
/// serving requests. Workers still alive after the timeout are force
|
||||
/// dropped.
|
||||
///
|
||||
/// By default shutdown timeout sets to 30 seconds.
|
||||
pub fn shutdown_timeout(mut self, sec: u16) -> Self {
|
||||
self.shutdown_timeout = sec;
|
||||
self
|
||||
}
|
||||
|
||||
/// Add new service to server
|
||||
pub fn bind<T, U, N>(mut self, addr: U, srv: T) -> io::Result<Self>
|
||||
where
|
||||
U: net::ToSocketAddrs,
|
||||
T: IntoNewService<N> + Clone,
|
||||
N: NewService<Request = TcpStream, Response = (), InitError = io::Error>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
N::Service: 'static,
|
||||
N::Future: 'static,
|
||||
N::Error: fmt::Display,
|
||||
{
|
||||
let sockets = bind_addr(addr)?;
|
||||
|
||||
for lst in sockets {
|
||||
self = self.listen(lst, srv.clone())
|
||||
}
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Add new service to server
|
||||
pub fn listen<T, N>(mut self, lst: net::TcpListener, srv: T) -> Self
|
||||
where
|
||||
T: IntoNewService<N>,
|
||||
N: NewService<Request = TcpStream, Response = (), InitError = io::Error>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
N::Service: 'static,
|
||||
N::Future: 'static,
|
||||
N::Error: fmt::Display,
|
||||
{
|
||||
let token = Token(self.services.len());
|
||||
self.services.push(ServerNewService::create(srv.into()));
|
||||
self.sockets.push((token, lst));
|
||||
self
|
||||
}
|
||||
|
||||
/// Spawn new thread and start listening for incoming connections.
|
||||
///
|
||||
/// This method spawns new thread and starts new actix system. Other than
|
||||
/// that it is similar to `start()` method. This method blocks.
|
||||
///
|
||||
/// This methods panics if no socket addresses get bound.
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// # extern crate futures;
|
||||
/// # extern crate actix_web;
|
||||
/// # use futures::Future;
|
||||
/// use actix_web::*;
|
||||
///
|
||||
/// fn main() {
|
||||
/// Server::new().
|
||||
/// .service(
|
||||
/// HttpServer::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok())))
|
||||
/// .bind("127.0.0.1:0")
|
||||
/// .expect("Can not bind to 127.0.0.1:0"))
|
||||
/// .run();
|
||||
/// }
|
||||
/// ```
|
||||
pub fn run(self) {
|
||||
let sys = System::new("http-server");
|
||||
self.start();
|
||||
sys.run();
|
||||
}
|
||||
|
||||
/// Starts Server Actor and returns its address
|
||||
pub fn start(mut self) -> Addr<Server> {
|
||||
if self.sockets.is_empty() {
|
||||
panic!("Service should have at least one bound socket");
|
||||
} else {
|
||||
info!("Starting {} http workers", self.threads);
|
||||
|
||||
// start workers
|
||||
let mut workers = Vec::new();
|
||||
for idx in 0..self.threads {
|
||||
let (addr, worker) = self.start_worker(idx, self.accept.get_notify());
|
||||
workers.push(worker);
|
||||
self.workers.push((idx, addr));
|
||||
}
|
||||
|
||||
// start accept thread
|
||||
for sock in &self.sockets {
|
||||
info!("Starting server on http://{:?}", sock.1.local_addr().ok());
|
||||
}
|
||||
let rx = self
|
||||
.accept
|
||||
.start(mem::replace(&mut self.sockets, Vec::new()), workers);
|
||||
|
||||
// start http server actor
|
||||
let signals = self.subscribe_to_signals();
|
||||
let addr = Actor::create(move |ctx| {
|
||||
ctx.add_stream(rx);
|
||||
self
|
||||
});
|
||||
if let Some(signals) = signals {
|
||||
signals.do_send(signal::Subscribe(addr.clone().recipient()))
|
||||
}
|
||||
addr
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe to os signals
|
||||
fn subscribe_to_signals(&self) -> Option<Addr<signal::ProcessSignals>> {
|
||||
if !self.no_signals {
|
||||
if let Some(ref signals) = self.signals {
|
||||
Some(signals.clone())
|
||||
} else {
|
||||
Some(System::current().registry().get::<signal::ProcessSignals>())
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr<Worker>, WorkerClient) {
|
||||
let (tx, rx) = unbounded::<Conn>();
|
||||
let conns = Connections::new(notify, self.maxconn, self.maxconnrate);
|
||||
let worker = WorkerClient::new(idx, tx, conns.clone());
|
||||
let services: Vec<Box<ServerServiceFactory + Send>> =
|
||||
self.services.iter().map(|v| v.clone_factory()).collect();
|
||||
|
||||
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
|
||||
ctx.add_message_stream(rx);
|
||||
Worker::new(ctx, services)
|
||||
});
|
||||
|
||||
(addr, worker)
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for Server {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
/// Signals support
|
||||
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
|
||||
/// message to `System` actor.
|
||||
impl Handler<signal::Signal> for Server {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
|
||||
match msg.0 {
|
||||
signal::SignalType::Int => {
|
||||
info!("SIGINT received, exiting");
|
||||
self.exit = true;
|
||||
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
|
||||
}
|
||||
signal::SignalType::Term => {
|
||||
info!("SIGTERM received, stopping");
|
||||
self.exit = true;
|
||||
Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
|
||||
}
|
||||
signal::SignalType::Quit => {
|
||||
info!("SIGQUIT received, exiting");
|
||||
self.exit = true;
|
||||
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<PauseServer> for Server {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) {
|
||||
self.accept.send(Command::Pause);
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<ResumeServer> for Server {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) {
|
||||
self.accept.send(Command::Resume);
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<StopServer> for Server {
|
||||
type Result = Response<(), ()>;
|
||||
|
||||
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
|
||||
// stop accept thread
|
||||
self.accept.send(Command::Stop);
|
||||
|
||||
// stop workers
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
|
||||
let dur = if msg.graceful {
|
||||
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
for worker in &self.workers {
|
||||
let tx2 = tx.clone();
|
||||
ctx.spawn(
|
||||
worker
|
||||
.1
|
||||
.send(StopWorker { graceful: dur })
|
||||
.into_actor(self)
|
||||
.then(move |_, slf, ctx| {
|
||||
slf.workers.pop();
|
||||
if slf.workers.is_empty() {
|
||||
let _ = tx2.send(());
|
||||
|
||||
// we need to stop system if server was spawned
|
||||
if slf.exit {
|
||||
ctx.run_later(Duration::from_millis(300), |_, _| {
|
||||
System::current().stop();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fut::ok(())
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
if !self.workers.is_empty() {
|
||||
Response::async(rx.into_future().map(|_| ()).map_err(|_| ()))
|
||||
} else {
|
||||
// we need to stop system if server was spawned
|
||||
if self.exit {
|
||||
ctx.run_later(Duration::from_millis(300), |_, _| {
|
||||
System::current().stop();
|
||||
});
|
||||
}
|
||||
Response::reply(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Commands from accept threads
|
||||
impl StreamHandler<ServerCommand, ()> for Server {
|
||||
fn finished(&mut self, _: &mut Context<Self>) {}
|
||||
|
||||
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
|
||||
match msg {
|
||||
ServerCommand::WorkerDied(idx) => {
|
||||
let mut found = false;
|
||||
for i in 0..self.workers.len() {
|
||||
if self.workers[i].0 == idx {
|
||||
self.workers.swap_remove(i);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
error!("Worker has died {:?}, restarting", idx);
|
||||
|
||||
let mut new_idx = self.workers.len();
|
||||
'found: loop {
|
||||
for i in 0..self.workers.len() {
|
||||
if self.workers[i].0 == new_idx {
|
||||
new_idx += 1;
|
||||
continue 'found;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
let (addr, worker) = self.start_worker(new_idx, self.accept.get_notify());
|
||||
self.workers.push((new_idx, addr));
|
||||
self.accept.send(Command::Worker(worker));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
/// Contains information about connection.
|
||||
pub struct Connections(Arc<ConnectionsInner>);
|
||||
|
||||
impl Connections {
|
||||
fn new(notify: AcceptNotify, maxconn: usize, maxconnrate: usize) -> Self {
|
||||
let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 };
|
||||
let maxconnrate_low = if maxconnrate > 10 {
|
||||
maxconnrate - 10
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
Connections(Arc::new(ConnectionsInner {
|
||||
notify,
|
||||
maxconn,
|
||||
maxconnrate,
|
||||
maxconn_low,
|
||||
maxconnrate_low,
|
||||
conn: AtomicUsize::new(0),
|
||||
connrate: AtomicUsize::new(0),
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) fn available(&self) -> bool {
|
||||
self.0.available()
|
||||
}
|
||||
|
||||
pub(crate) fn num_connections(&self) -> usize {
|
||||
self.0.conn.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Report opened connection
|
||||
pub fn connection(&self) -> ConnectionTag {
|
||||
ConnectionTag::new(self.0.clone())
|
||||
}
|
||||
|
||||
/// Report rate connection, rate is usually ssl handshake
|
||||
pub fn connection_rate(&self) -> ConnectionRateTag {
|
||||
ConnectionRateTag::new(self.0.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ConnectionsInner {
|
||||
notify: AcceptNotify,
|
||||
conn: AtomicUsize,
|
||||
connrate: AtomicUsize,
|
||||
maxconn: usize,
|
||||
maxconnrate: usize,
|
||||
maxconn_low: usize,
|
||||
maxconnrate_low: usize,
|
||||
}
|
||||
|
||||
impl ConnectionsInner {
|
||||
fn available(&self) -> bool {
|
||||
if self.maxconnrate <= self.connrate.load(Ordering::Relaxed) {
|
||||
false
|
||||
} else {
|
||||
self.maxconn > self.conn.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_maxconn(&self, maxconn: usize) {
|
||||
if maxconn > self.maxconn_low && maxconn <= self.maxconn {
|
||||
self.notify.notify();
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_maxconnrate(&self, connrate: usize) {
|
||||
if connrate > self.maxconnrate_low && connrate <= self.maxconnrate {
|
||||
self.notify.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Type responsible for max connection stat.
|
||||
///
|
||||
/// Max connections stat get updated on drop.
|
||||
pub struct ConnectionTag(Arc<ConnectionsInner>);
|
||||
|
||||
impl ConnectionTag {
|
||||
fn new(inner: Arc<ConnectionsInner>) -> Self {
|
||||
inner.conn.fetch_add(1, Ordering::Relaxed);
|
||||
ConnectionTag(inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ConnectionTag {
|
||||
fn drop(&mut self) {
|
||||
let conn = self.0.conn.fetch_sub(1, Ordering::Relaxed);
|
||||
self.0.notify_maxconn(conn);
|
||||
}
|
||||
}
|
||||
|
||||
/// Type responsible for max connection rate stat.
|
||||
///
|
||||
/// Max connections rate stat get updated on drop.
|
||||
pub struct ConnectionRateTag(Arc<ConnectionsInner>);
|
||||
|
||||
impl ConnectionRateTag {
|
||||
fn new(inner: Arc<ConnectionsInner>) -> Self {
|
||||
inner.connrate.fetch_add(1, Ordering::Relaxed);
|
||||
ConnectionRateTag(inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ConnectionRateTag {
|
||||
fn drop(&mut self) {
|
||||
let connrate = self.0.connrate.fetch_sub(1, Ordering::Relaxed);
|
||||
self.0.notify_maxconnrate(connrate);
|
||||
}
|
||||
}
|
||||
|
||||
fn bind_addr<S: net::ToSocketAddrs>(addr: S) -> io::Result<Vec<net::TcpListener>> {
|
||||
let mut err = None;
|
||||
let mut succ = false;
|
||||
let mut sockets = Vec::new();
|
||||
for addr in addr.to_socket_addrs()? {
|
||||
match create_tcp_listener(addr) {
|
||||
Ok(lst) => {
|
||||
succ = true;
|
||||
sockets.push(lst);
|
||||
}
|
||||
Err(e) => err = Some(e),
|
||||
}
|
||||
}
|
||||
|
||||
if !succ {
|
||||
if let Some(e) = err.take() {
|
||||
Err(e)
|
||||
} else {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Can not bind to address.",
|
||||
))
|
||||
}
|
||||
} else {
|
||||
Ok(sockets)
|
||||
}
|
||||
}
|
||||
|
||||
fn create_tcp_listener(addr: net::SocketAddr) -> io::Result<net::TcpListener> {
|
||||
let builder = match addr {
|
||||
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
|
||||
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
|
||||
};
|
||||
builder.reuse_address(true)?;
|
||||
builder.bind(addr)?;
|
||||
Ok(builder.listen(1024)?)
|
||||
}
|
106
src/server_service.rs
Normal file
106
src/server_service.rs
Normal file
@ -0,0 +1,106 @@
|
||||
use std::{fmt, io, net};
|
||||
|
||||
use futures::{future, Future, Poll};
|
||||
use tokio_reactor::Handle;
|
||||
use tokio_tcp::TcpStream;
|
||||
use tower_service::{NewService, Service};
|
||||
|
||||
pub(crate) type BoxedServerService = Box<
|
||||
Service<
|
||||
Request = net::TcpStream,
|
||||
Response = (),
|
||||
Error = (),
|
||||
Future = Box<Future<Item = (), Error = ()>>,
|
||||
>,
|
||||
>;
|
||||
|
||||
pub(crate) struct ServerService<T> {
|
||||
inner: T,
|
||||
}
|
||||
|
||||
impl<T> Service for ServerService<T>
|
||||
where
|
||||
T: Service<Request = TcpStream, Response = ()>,
|
||||
T::Future: 'static,
|
||||
T::Error: fmt::Display + 'static,
|
||||
{
|
||||
type Request = net::TcpStream;
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = Box<Future<Item = (), Error = ()>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.inner.poll_ready().map_err(|_| ())
|
||||
}
|
||||
|
||||
fn call(&mut self, stream: net::TcpStream) -> Self::Future {
|
||||
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| {
|
||||
error!("Can not convert to an async tcp stream: {}", e);
|
||||
});
|
||||
|
||||
if let Ok(stream) = stream {
|
||||
Box::new(self.inner.call(stream).map_err(|_| ()))
|
||||
} else {
|
||||
Box::new(future::err(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ServerNewService<T> {
|
||||
inner: T,
|
||||
}
|
||||
|
||||
impl<T> ServerNewService<T>
|
||||
where
|
||||
T: NewService<Request = TcpStream, Response = (), InitError = io::Error>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
T::Service: 'static,
|
||||
T::Future: 'static,
|
||||
T::Error: fmt::Display,
|
||||
{
|
||||
pub(crate) fn create(inner: T) -> Box<ServerServiceFactory + Send> {
|
||||
Box::new(Self { inner })
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ServerServiceFactory {
|
||||
fn clone_factory(&self) -> Box<ServerServiceFactory + Send>;
|
||||
|
||||
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
|
||||
}
|
||||
|
||||
impl<T> ServerServiceFactory for ServerNewService<T>
|
||||
where
|
||||
T: NewService<Request = TcpStream, Response = (), InitError = io::Error>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
T::Service: 'static,
|
||||
T::Future: 'static,
|
||||
T::Error: fmt::Display,
|
||||
{
|
||||
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> {
|
||||
Box::new(Self {
|
||||
inner: self.inner.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
|
||||
Box::new(self.inner.new_service().map_err(|_| ()).map(|inner| {
|
||||
let service: BoxedServerService = Box::new(ServerService { inner });
|
||||
service
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerServiceFactory for Box<ServerServiceFactory> {
|
||||
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> {
|
||||
self.as_ref().clone_factory()
|
||||
}
|
||||
|
||||
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
|
||||
self.as_ref().create()
|
||||
}
|
||||
}
|
521
src/service.rs
Normal file
521
src/service.rs
Normal file
@ -0,0 +1,521 @@
|
||||
use std::cell::RefCell;
|
||||
use std::marker;
|
||||
use std::rc::Rc;
|
||||
|
||||
use futures::{future, future::FutureResult, Async, Future, IntoFuture, Poll};
|
||||
use tower_service::{NewService, Service};
|
||||
|
||||
pub trait NewServiceExt: NewService {
|
||||
fn and_then<F, B>(self, new_service: F) -> AndThenNewService<Self, B>
|
||||
where
|
||||
Self: Sized,
|
||||
F: IntoNewService<B>,
|
||||
B: NewService<
|
||||
Request = Self::Response,
|
||||
Error = Self::Error,
|
||||
InitError = Self::InitError,
|
||||
>;
|
||||
}
|
||||
|
||||
impl<T> NewServiceExt for T
|
||||
where
|
||||
T: NewService,
|
||||
{
|
||||
fn and_then<F, B>(self, new_service: F) -> AndThenNewService<Self, B>
|
||||
where
|
||||
F: IntoNewService<B>,
|
||||
B: NewService<
|
||||
Request = Self::Response,
|
||||
Error = Self::Error,
|
||||
InitError = Self::InitError,
|
||||
>,
|
||||
{
|
||||
AndThenNewService::new(self, new_service)
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for types that can be converted to a Service
|
||||
pub trait IntoService<T>
|
||||
where
|
||||
T: Service,
|
||||
{
|
||||
/// Create service
|
||||
fn into(self) -> T;
|
||||
}
|
||||
|
||||
/// Trait for types that can be converted to a Service
|
||||
pub trait IntoNewService<T>
|
||||
where
|
||||
T: NewService,
|
||||
{
|
||||
/// Create service
|
||||
fn into(self) -> T;
|
||||
}
|
||||
|
||||
impl<T> IntoService<T> for T
|
||||
where
|
||||
T: Service,
|
||||
{
|
||||
fn into(self) -> T {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> IntoNewService<T> for T
|
||||
where
|
||||
T: NewService,
|
||||
{
|
||||
fn into(self) -> T {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Req, Resp, Err, Fut> IntoService<FnService<F, Req, Resp, Err, Fut>> for F
|
||||
where
|
||||
F: Fn(Req) -> Fut + 'static,
|
||||
Fut: IntoFuture<Item = Resp, Error = Err>,
|
||||
{
|
||||
fn into(self) -> FnService<F, Req, Resp, Err, Fut> {
|
||||
FnService::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FnService<F, Req, Resp, E, Fut>
|
||||
where
|
||||
F: Fn(Req) -> Fut,
|
||||
Fut: IntoFuture<Item = Resp, Error = E>,
|
||||
{
|
||||
f: F,
|
||||
req: marker::PhantomData<Req>,
|
||||
resp: marker::PhantomData<Resp>,
|
||||
err: marker::PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<F, Req, Resp, E, Fut> FnService<F, Req, Resp, E, Fut>
|
||||
where
|
||||
F: Fn(Req) -> Fut,
|
||||
Fut: IntoFuture<Item = Resp, Error = E>,
|
||||
{
|
||||
pub fn new(f: F) -> Self {
|
||||
FnService {
|
||||
f,
|
||||
req: marker::PhantomData,
|
||||
resp: marker::PhantomData,
|
||||
err: marker::PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Req, Resp, E, Fut> Service for FnService<F, Req, Resp, E, Fut>
|
||||
where
|
||||
F: Fn(Req) -> Fut,
|
||||
Fut: IntoFuture<Item = Resp, Error = E>,
|
||||
{
|
||||
type Request = Req;
|
||||
type Response = Resp;
|
||||
type Error = E;
|
||||
type Future = Fut::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
(self.f)(req).into_future()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FnNewService<F, Req, Resp, Err, Fut>
|
||||
where
|
||||
F: Fn(Req) -> Fut,
|
||||
Fut: IntoFuture<Item = Resp, Error = Err>,
|
||||
{
|
||||
f: F,
|
||||
req: marker::PhantomData<Req>,
|
||||
resp: marker::PhantomData<Resp>,
|
||||
err: marker::PhantomData<Err>,
|
||||
}
|
||||
|
||||
impl<F, Req, Resp, Err, Fut> FnNewService<F, Req, Resp, Err, Fut>
|
||||
where
|
||||
F: Fn(Req) -> Fut + Clone,
|
||||
Fut: IntoFuture<Item = Resp, Error = Err>,
|
||||
{
|
||||
fn new(f: F) -> Self {
|
||||
FnNewService {
|
||||
f,
|
||||
req: marker::PhantomData,
|
||||
resp: marker::PhantomData,
|
||||
err: marker::PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Req, Resp, Err, Fut> NewService for FnNewService<F, Req, Resp, Err, Fut>
|
||||
where
|
||||
F: Fn(Req) -> Fut + Clone,
|
||||
Fut: IntoFuture<Item = Resp, Error = Err>,
|
||||
{
|
||||
type Request = Req;
|
||||
type Response = Resp;
|
||||
type Error = Err;
|
||||
type Service = FnService<F, Req, Resp, Err, Fut>;
|
||||
type InitError = ();
|
||||
type Future = FutureResult<Self::Service, ()>;
|
||||
|
||||
fn new_service(&self) -> Self::Future {
|
||||
future::ok(FnService::new(self.f.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Req, Resp, Err, Fut> IntoNewService<FnNewService<F, Req, Resp, Err, Fut>> for F
|
||||
where
|
||||
F: Fn(Req) -> Fut + Clone + 'static,
|
||||
Fut: IntoFuture<Item = Resp, Error = Err>,
|
||||
{
|
||||
fn into(self) -> FnNewService<F, Req, Resp, Err, Fut> {
|
||||
FnNewService::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Req, Resp, Err, Fut> Clone for FnNewService<F, Req, Resp, Err, Fut>
|
||||
where
|
||||
F: Fn(Req) -> Fut + Clone,
|
||||
Fut: IntoFuture<Item = Resp, Error = Err>,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self::new(self.f.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FnStateService<S, F, Req, Resp, Err, Fut>
|
||||
where
|
||||
F: Fn(&mut S, Req) -> Fut,
|
||||
Fut: IntoFuture<Item = Resp, Error = Err>,
|
||||
{
|
||||
f: F,
|
||||
state: S,
|
||||
req: marker::PhantomData<Req>,
|
||||
resp: marker::PhantomData<Resp>,
|
||||
err: marker::PhantomData<Err>,
|
||||
}
|
||||
|
||||
impl<S, F, Req, Resp, Err, Fut> FnStateService<S, F, Req, Resp, Err, Fut>
|
||||
where
|
||||
F: Fn(&mut S, Req) -> Fut,
|
||||
Fut: IntoFuture<Item = Resp, Error = Err>,
|
||||
{
|
||||
pub fn new(state: S, f: F) -> Self {
|
||||
FnStateService {
|
||||
f,
|
||||
state,
|
||||
req: marker::PhantomData,
|
||||
resp: marker::PhantomData,
|
||||
err: marker::PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, F, Req, Resp, Err, Fut> Service for FnStateService<S, F, Req, Resp, Err, Fut>
|
||||
where
|
||||
F: Fn(&mut S, Req) -> Fut,
|
||||
Fut: IntoFuture<Item = Resp, Error = Err>,
|
||||
{
|
||||
type Request = Req;
|
||||
type Response = Resp;
|
||||
type Error = Err;
|
||||
type Future = Fut::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
(self.f)(&mut self.state, req).into_future()
|
||||
}
|
||||
}
|
||||
|
||||
/// `NewService` for state and handler functions
|
||||
pub struct FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> {
|
||||
f: F1,
|
||||
state: F2,
|
||||
s: marker::PhantomData<S>,
|
||||
req: marker::PhantomData<Req>,
|
||||
resp: marker::PhantomData<Resp>,
|
||||
err1: marker::PhantomData<Err1>,
|
||||
err2: marker::PhantomData<Err2>,
|
||||
fut1: marker::PhantomData<Fut1>,
|
||||
fut2: marker::PhantomData<Fut2>,
|
||||
}
|
||||
|
||||
impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2>
|
||||
FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2>
|
||||
{
|
||||
fn new(f: F1, state: F2) -> Self {
|
||||
FnStateNewService {
|
||||
f,
|
||||
state,
|
||||
s: marker::PhantomData,
|
||||
req: marker::PhantomData,
|
||||
resp: marker::PhantomData,
|
||||
err1: marker::PhantomData,
|
||||
err2: marker::PhantomData,
|
||||
fut1: marker::PhantomData,
|
||||
fut2: marker::PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> NewService
|
||||
for FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2>
|
||||
where
|
||||
S: 'static,
|
||||
F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static,
|
||||
F2: Fn() -> Fut2,
|
||||
Fut1: IntoFuture<Item = Resp, Error = Err1> + 'static,
|
||||
Fut2: IntoFuture<Item = S, Error = Err2> + 'static,
|
||||
Req: 'static,
|
||||
Resp: 'static,
|
||||
Err1: 'static,
|
||||
Err2: 'static,
|
||||
{
|
||||
type Request = Req;
|
||||
type Response = Resp;
|
||||
type Error = Err1;
|
||||
type Service = FnStateService<S, F1, Req, Resp, Err1, Fut1>;
|
||||
type InitError = Err2;
|
||||
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>;
|
||||
|
||||
fn new_service(&self) -> Self::Future {
|
||||
let f = self.f.clone();
|
||||
Box::new(
|
||||
(self.state)()
|
||||
.into_future()
|
||||
.and_then(move |state| Ok(FnStateService::new(state, f))),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2>
|
||||
IntoNewService<FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2>> for (F1, F2)
|
||||
where
|
||||
S: 'static,
|
||||
F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static,
|
||||
F2: Fn() -> Fut2,
|
||||
Fut1: IntoFuture<Item = Resp, Error = Err1> + 'static,
|
||||
Fut2: IntoFuture<Item = S, Error = Err2> + 'static,
|
||||
Req: 'static,
|
||||
Resp: 'static,
|
||||
Err1: 'static,
|
||||
Err2: 'static,
|
||||
{
|
||||
fn into(self) -> FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> {
|
||||
FnStateNewService::new(self.0, self.1)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> Clone
|
||||
for FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2>
|
||||
where
|
||||
F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static,
|
||||
F2: Fn() -> Fut2 + Clone,
|
||||
Fut1: IntoFuture<Item = Resp, Error = Err1>,
|
||||
Fut2: IntoFuture<Item = S, Error = Err2>,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self::new(self.f.clone(), self.state.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// `AndThen` service combinator
|
||||
pub struct AndThen<A, B> {
|
||||
a: A,
|
||||
b: Rc<RefCell<B>>,
|
||||
}
|
||||
|
||||
impl<A, B> AndThen<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error>,
|
||||
{
|
||||
/// Create new `AndThen` combinator
|
||||
pub fn new(a: A, b: B) -> Self {
|
||||
Self {
|
||||
a,
|
||||
b: Rc::new(RefCell::new(b)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Service for AndThen<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error>,
|
||||
{
|
||||
type Request = A::Request;
|
||||
type Response = B::Response;
|
||||
type Error = B::Error;
|
||||
type Future = AndThenFuture<A, B>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
let res = self.a.poll_ready();
|
||||
if let Ok(Async::Ready(_)) = res {
|
||||
self.b.borrow_mut().poll_ready()
|
||||
} else {
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
AndThenFuture::new(self.a.call(req), self.b.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AndThenFuture<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error>,
|
||||
{
|
||||
b: Rc<RefCell<B>>,
|
||||
fut_b: Option<B::Future>,
|
||||
fut_a: A::Future,
|
||||
}
|
||||
|
||||
impl<A, B> AndThenFuture<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error>,
|
||||
{
|
||||
fn new(fut_a: A::Future, b: Rc<RefCell<B>>) -> Self {
|
||||
AndThenFuture {
|
||||
b,
|
||||
fut_a,
|
||||
fut_b: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Future for AndThenFuture<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error>,
|
||||
{
|
||||
type Item = B::Response;
|
||||
type Error = B::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut fut) = self.fut_b {
|
||||
return fut.poll();
|
||||
}
|
||||
|
||||
match self.fut_a.poll()? {
|
||||
Async::Ready(resp) => {
|
||||
self.fut_b = Some(self.b.borrow_mut().call(resp));
|
||||
self.poll()
|
||||
}
|
||||
Async::NotReady => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `AndThenNewService` new service combinator
|
||||
pub struct AndThenNewService<A, B> {
|
||||
a: A,
|
||||
b: B,
|
||||
}
|
||||
|
||||
impl<A, B> AndThenNewService<A, B>
|
||||
where
|
||||
A: NewService,
|
||||
B: NewService,
|
||||
{
|
||||
/// Create new `AndThen` combinator
|
||||
pub fn new<F: IntoNewService<B>>(a: A, f: F) -> Self {
|
||||
Self { a, b: f.into() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> NewService for AndThenNewService<A, B>
|
||||
where
|
||||
A: NewService<Response = B::Request, Error = B::Error, InitError = B::InitError>,
|
||||
B: NewService,
|
||||
{
|
||||
type Request = A::Request;
|
||||
type Response = B::Response;
|
||||
type Error = A::Error;
|
||||
type Service = AndThen<A::Service, B::Service>;
|
||||
|
||||
type InitError = A::InitError;
|
||||
type Future = AndThenNewServiceFuture<A, B>;
|
||||
|
||||
fn new_service(&self) -> Self::Future {
|
||||
AndThenNewServiceFuture::new(self.a.new_service(), self.b.new_service())
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Clone for AndThenNewService<A, B>
|
||||
where
|
||||
A: NewService<Response = B::Request, Error = B::Error, InitError = B::InitError> + Clone,
|
||||
B: NewService + Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
a: self.a.clone(),
|
||||
b: self.b.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AndThenNewServiceFuture<A, B>
|
||||
where
|
||||
A: NewService,
|
||||
B: NewService,
|
||||
{
|
||||
fut_b: B::Future,
|
||||
fut_a: A::Future,
|
||||
a: Option<A::Service>,
|
||||
b: Option<B::Service>,
|
||||
}
|
||||
|
||||
impl<A, B> AndThenNewServiceFuture<A, B>
|
||||
where
|
||||
A: NewService,
|
||||
B: NewService,
|
||||
{
|
||||
fn new(fut_a: A::Future, fut_b: B::Future) -> Self {
|
||||
AndThenNewServiceFuture {
|
||||
fut_a,
|
||||
fut_b,
|
||||
a: None,
|
||||
b: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Future for AndThenNewServiceFuture<A, B>
|
||||
where
|
||||
A: NewService<Response = B::Request, Error = B::Error, InitError = B::InitError>,
|
||||
B: NewService,
|
||||
{
|
||||
type Item = AndThen<A::Service, B::Service>;
|
||||
type Error = B::InitError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Async::Ready(service) = self.fut_a.poll()? {
|
||||
self.a = Some(service);
|
||||
}
|
||||
|
||||
if let Async::Ready(service) = self.fut_b.poll()? {
|
||||
self.b = Some(service);
|
||||
}
|
||||
|
||||
if self.a.is_some() && self.b.is_some() {
|
||||
Ok(Async::Ready(AndThen::new(
|
||||
self.a.take().unwrap(),
|
||||
self.b.take().unwrap(),
|
||||
)))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
}
|
15
src/ssl/mod.rs
Normal file
15
src/ssl/mod.rs
Normal file
@ -0,0 +1,15 @@
|
||||
//! SSL Services
|
||||
#[cfg(feature = "ssl")]
|
||||
mod openssl;
|
||||
#[cfg(feature = "ssl")]
|
||||
pub use self::openssl::OpensslService;
|
||||
|
||||
// #[cfg(feature = "tls")]
|
||||
// mod nativetls;
|
||||
// #[cfg(feature = "tls")]
|
||||
// pub use self::nativetls::{NativeTlsAcceptor, TlsStream};
|
||||
|
||||
// #[cfg(feature = "rust-tls")]
|
||||
// mod rustls;
|
||||
// #[cfg(feature = "rust-tls")]
|
||||
// pub use self::rustls::RustlsAcceptor;
|
125
src/ssl/openssl.rs
Normal file
125
src/ssl/openssl.rs
Normal file
@ -0,0 +1,125 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::net::Shutdown;
|
||||
use std::{io, time};
|
||||
|
||||
use futures::{future, future::FutureResult, Async, Future, Poll};
|
||||
use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream};
|
||||
use tokio_tcp::TcpStream;
|
||||
use tower_service::{NewService, Service};
|
||||
|
||||
use {IntoNewService, IoStream};
|
||||
|
||||
/// Support `SSL` connections via openssl package
|
||||
///
|
||||
/// `alpn` feature enables `OpensslAcceptor` type
|
||||
pub struct OpensslService<T> {
|
||||
acceptor: SslAcceptor,
|
||||
io: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> OpensslService<T> {
|
||||
/// Create default `OpensslService`
|
||||
pub fn new(builder: SslAcceptorBuilder) -> Self {
|
||||
OpensslService {
|
||||
acceptor: builder.build(),
|
||||
io: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create `OpensslWith` with `HTTP1.1` and `HTTP2`.
|
||||
pub fn for_http(mut builder: SslAcceptorBuilder) -> io::Result<Self> {
|
||||
let protos = b"\x08http/1.1\x02h2";
|
||||
|
||||
builder.set_alpn_select_callback(|_, protos| {
|
||||
const H2: &[u8] = b"\x02h2";
|
||||
if protos.windows(3).any(|window| window == H2) {
|
||||
Ok(b"h2")
|
||||
} else {
|
||||
Err(AlpnError::NOACK)
|
||||
}
|
||||
});
|
||||
builder.set_alpn_protos(&protos[..])?;
|
||||
|
||||
Ok(OpensslService {
|
||||
acceptor: builder.build(),
|
||||
io: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
impl<T: AsyncRead + AsyncWrite> Clone for OpensslService<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
acceptor: self.acceptor.clone(),
|
||||
io: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> NewService for OpensslService<T> {
|
||||
type Request = T;
|
||||
type Response = SslStream<T>;
|
||||
type Error = io::Error;
|
||||
type Service = OpensslAcceptor<T>;
|
||||
type InitError = io::Error;
|
||||
type Future = FutureResult<Self::Service, io::Error>;
|
||||
|
||||
fn new_service(&self) -> Self::Future {
|
||||
future::ok(OpensslAcceptor {
|
||||
acceptor: self.acceptor.clone(),
|
||||
io: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpensslAcceptor<T> {
|
||||
acceptor: SslAcceptor,
|
||||
io: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> Service for OpensslAcceptor<T> {
|
||||
type Request = T;
|
||||
type Response = SslStream<T>;
|
||||
type Error = io::Error;
|
||||
type Future = AcceptorFuture<T>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
AcceptorFuture(SslAcceptorExt::accept_async(&self.acceptor, req))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AcceptorFuture<T>(AcceptAsync<T>);
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> Future for AcceptorFuture<T> {
|
||||
type Item = SslStream<T>;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
self.0
|
||||
.poll()
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
}
|
||||
|
||||
// impl<T: IoStream> IoStream for SslStream<T> {
|
||||
// #[inline]
|
||||
// fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
|
||||
// let _ = self.get_mut().shutdown();
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
// #[inline]
|
||||
// fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
|
||||
// self.get_mut().get_mut().set_nodelay(nodelay)
|
||||
// }
|
||||
|
||||
// #[inline]
|
||||
// fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
|
||||
// self.get_mut().get_mut().set_linger(dur)
|
||||
// }
|
||||
// }
|
152
src/worker.rs
Normal file
152
src/worker.rs
Normal file
@ -0,0 +1,152 @@
|
||||
use std::{net, time};
|
||||
|
||||
use futures::future;
|
||||
use futures::sync::mpsc::{SendError, UnboundedSender};
|
||||
use futures::sync::oneshot;
|
||||
|
||||
use actix::msgs::StopArbiter;
|
||||
use actix::{
|
||||
fut, Actor, ActorContext, ActorFuture, Arbiter, AsyncContext, Context, Handler, Message,
|
||||
Response, WrapFuture,
|
||||
};
|
||||
|
||||
use super::server_service::{BoxedServerService, ServerServiceFactory};
|
||||
use super::{server::Connections, Token};
|
||||
|
||||
#[derive(Message)]
|
||||
pub(crate) struct Conn {
|
||||
pub io: net::TcpStream,
|
||||
pub handler: Token,
|
||||
pub token: Token,
|
||||
pub peer: Option<net::SocketAddr>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct WorkerClient {
|
||||
pub idx: usize,
|
||||
tx: UnboundedSender<Conn>,
|
||||
conns: Connections,
|
||||
}
|
||||
|
||||
impl WorkerClient {
|
||||
pub fn new(idx: usize, tx: UnboundedSender<Conn>, conns: Connections) -> Self {
|
||||
WorkerClient { idx, tx, conns }
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: Conn) -> Result<(), SendError<Conn>> {
|
||||
self.tx.unbounded_send(msg)
|
||||
}
|
||||
|
||||
pub fn available(&self) -> bool {
|
||||
self.conns.available()
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop worker message. Returns `true` on successful shutdown
|
||||
/// and `false` if some connections still alive.
|
||||
pub(crate) struct StopWorker {
|
||||
pub graceful: Option<time::Duration>,
|
||||
}
|
||||
|
||||
impl Message for StopWorker {
|
||||
type Result = Result<bool, ()>;
|
||||
}
|
||||
|
||||
/// Http worker
|
||||
///
|
||||
/// Worker accepts Socket objects via unbounded channel and start requests
|
||||
/// processing.
|
||||
pub(crate) struct Worker {
|
||||
// conns: Connections,
|
||||
services: Vec<BoxedServerService>,
|
||||
}
|
||||
|
||||
impl Actor for Worker {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
pub(crate) fn new(
|
||||
ctx: &mut Context<Self>, services: Vec<Box<ServerServiceFactory + Send>>,
|
||||
) -> Self {
|
||||
let wrk = Worker {
|
||||
services: Vec::new(),
|
||||
};
|
||||
|
||||
ctx.wait(
|
||||
future::join_all(services.into_iter().map(|s| s.create()))
|
||||
.into_actor(&wrk)
|
||||
.map_err(|e, _, ctx| {
|
||||
error!("Can not start worker: {:?}", e);
|
||||
Arbiter::current().do_send(StopArbiter(0));
|
||||
ctx.stop();
|
||||
}).and_then(|services, act, _| {
|
||||
act.services.extend(services);
|
||||
fut::ok(())
|
||||
}),
|
||||
);
|
||||
|
||||
wrk
|
||||
}
|
||||
|
||||
fn shutdown(&self, _force: bool) {
|
||||
// self.services.iter().for_each(|h| h.shutdown(force));
|
||||
}
|
||||
|
||||
fn shutdown_timeout(
|
||||
&self, _ctx: &mut Context<Worker>, _tx: oneshot::Sender<bool>, _dur: time::Duration,
|
||||
) {
|
||||
// sleep for 1 second and then check again
|
||||
// ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
|
||||
// let num = slf.conns.num_connections();
|
||||
// if num == 0 {
|
||||
// let _ = tx.send(true);
|
||||
// Arbiter::current().do_send(StopArbiter(0));
|
||||
// } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
|
||||
// slf.shutdown_timeout(ctx, tx, d);
|
||||
// } else {
|
||||
// info!("Force shutdown http worker, {} connections", num);
|
||||
// slf.shutdown(true);
|
||||
// let _ = tx.send(false);
|
||||
// Arbiter::current().do_send(StopArbiter(0));
|
||||
// }
|
||||
// });
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Conn> for Worker {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Conn, _: &mut Context<Self>) {
|
||||
Arbiter::spawn(self.services[msg.handler.0].call(msg.io))
|
||||
}
|
||||
}
|
||||
|
||||
/// `StopWorker` message handler
|
||||
impl Handler<StopWorker> for Worker {
|
||||
type Result = Response<bool, ()>;
|
||||
|
||||
fn handle(&mut self, _msg: StopWorker, _ctx: &mut Context<Self>) -> Self::Result {
|
||||
unimplemented!()
|
||||
// let num = self.conns.num_connections();
|
||||
// if num == 0 {
|
||||
// info!("Shutting down http worker, 0 connections");
|
||||
// Response::reply(Ok(true))
|
||||
// } else if let Some(dur) = msg.graceful {
|
||||
// self.shutdown(false);
|
||||
// let (tx, rx) = oneshot::channel();
|
||||
// let num = self.conns.num_connections();
|
||||
// if num != 0 {
|
||||
// info!("Graceful http worker shutdown, {} connections", num);
|
||||
// self.shutdown_timeout(ctx, tx, dur);
|
||||
// Response::reply(Ok(true))
|
||||
// } else {
|
||||
// Response::async(rx.map_err(|_| ()))
|
||||
// }
|
||||
// } else {
|
||||
// info!("Force shutdown http worker, {} connections", num);
|
||||
// self.shutdown(true);
|
||||
// Response::reply(Ok(false))
|
||||
// }
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user