mirror of
https://github.com/actix/actix-extras.git
synced 2025-06-30 03:44:27 +02:00
migrate to tokio
This commit is contained in:
@ -2,12 +2,11 @@ use std::collections::VecDeque;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::rc::Rc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use actix::Arbiter;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use futures::{Async, Future, Poll};
|
||||
use tokio_core::reactor::Timeout;
|
||||
use tokio_timer::Delay;
|
||||
|
||||
use error::PayloadError;
|
||||
use httprequest::HttpRequest;
|
||||
@ -53,7 +52,7 @@ pub(crate) struct Http1<T: IoStream, H: 'static> {
|
||||
payload: Option<PayloadType>,
|
||||
buf: BytesMut,
|
||||
tasks: VecDeque<Entry>,
|
||||
keepalive_timer: Option<Timeout>,
|
||||
keepalive_timer: Option<Delay>,
|
||||
}
|
||||
|
||||
struct Entry {
|
||||
@ -295,8 +294,7 @@ where
|
||||
if self.keepalive_timer.is_none() && keep_alive > 0 {
|
||||
trace!("Start keep-alive timer");
|
||||
let mut timer =
|
||||
Timeout::new(Duration::new(keep_alive, 0), Arbiter::handle())
|
||||
.unwrap();
|
||||
Delay::new(Instant::now() + Duration::new(keep_alive, 0));
|
||||
// register timer
|
||||
let _ = timer.poll();
|
||||
self.keepalive_timer = Some(timer);
|
||||
|
@ -4,17 +4,16 @@ use std::collections::VecDeque;
|
||||
use std::io::{Read, Write};
|
||||
use std::net::SocketAddr;
|
||||
use std::rc::Rc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{cmp, io, mem};
|
||||
|
||||
use actix::Arbiter;
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use http2::server::{self, Connection, Handshake, SendResponse};
|
||||
use http2::{Reason, RecvStream};
|
||||
use modhttp::request::Parts;
|
||||
use tokio_core::reactor::Timeout;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::Delay;
|
||||
|
||||
use error::PayloadError;
|
||||
use httpmessage::HttpMessage;
|
||||
@ -46,7 +45,7 @@ where
|
||||
addr: Option<SocketAddr>,
|
||||
state: State<IoWrapper<T>>,
|
||||
tasks: VecDeque<Entry<H>>,
|
||||
keepalive_timer: Option<Timeout>,
|
||||
keepalive_timer: Option<Delay>,
|
||||
}
|
||||
|
||||
enum State<T: AsyncRead + AsyncWrite> {
|
||||
@ -218,9 +217,10 @@ where
|
||||
let keep_alive = self.settings.keep_alive();
|
||||
if keep_alive > 0 && self.keepalive_timer.is_none() {
|
||||
trace!("Start keep-alive timer");
|
||||
let mut timeout = Timeout::new(
|
||||
Duration::new(keep_alive, 0),
|
||||
Arbiter::handle()).unwrap();
|
||||
let mut timeout = Delay::new(
|
||||
Instant::now()
|
||||
+ Duration::new(keep_alive, 0),
|
||||
);
|
||||
// register timeout
|
||||
let _ = timeout.poll();
|
||||
self.keepalive_timer = Some(timeout);
|
||||
|
@ -5,8 +5,8 @@ use std::{io, time};
|
||||
use actix;
|
||||
use bytes::BytesMut;
|
||||
use futures::{Async, Poll};
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_tcp::TcpStream;
|
||||
|
||||
mod channel;
|
||||
pub(crate) mod encoding;
|
||||
|
@ -563,11 +563,10 @@ impl<H: IntoHttpHandler> HttpServer<H> {
|
||||
/// Start listening for incoming connections from a stream.
|
||||
///
|
||||
/// This method uses only one thread for handling incoming connections.
|
||||
pub fn start_incoming<T, A, S>(mut self, stream: S, secure: bool) -> Addr<Syn, Self>
|
||||
pub fn start_incoming<T, S>(mut self, stream: S, secure: bool) -> Addr<Syn, Self>
|
||||
where
|
||||
S: Stream<Item = (T, A), Error = io::Error> + 'static,
|
||||
S: Stream<Item = T, Error = io::Error> + 'static,
|
||||
T: AsyncRead + AsyncWrite + 'static,
|
||||
A: 'static,
|
||||
{
|
||||
// set server settings
|
||||
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
|
||||
@ -581,7 +580,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
|
||||
// start server
|
||||
let signals = self.subscribe_to_signals();
|
||||
let addr: Addr<Syn, _> = HttpServer::create(move |ctx| {
|
||||
ctx.add_message_stream(stream.map_err(|_| ()).map(move |(t, _)| Conn {
|
||||
ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn {
|
||||
io: WrapperStream::new(t),
|
||||
token: 0,
|
||||
peer: None,
|
||||
@ -687,7 +686,7 @@ where
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result {
|
||||
Arbiter::handle().spawn(HttpChannel::new(
|
||||
Arbiter::spawn(HttpChannel::new(
|
||||
Rc::clone(self.h.as_ref().unwrap()),
|
||||
msg.io,
|
||||
msg.peer,
|
||||
|
@ -4,8 +4,8 @@ use net2::TcpStreamExt;
|
||||
use slab::Slab;
|
||||
use std::rc::Rc;
|
||||
use std::{net, time};
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_core::reactor::Handle;
|
||||
use tokio_reactor::Handle;
|
||||
use tokio_tcp::TcpStream;
|
||||
|
||||
#[cfg(any(feature = "tls", feature = "alpn"))]
|
||||
use futures::future;
|
||||
@ -60,7 +60,6 @@ where
|
||||
H: HttpHandler + 'static,
|
||||
{
|
||||
settings: Rc<WorkerSettings<H>>,
|
||||
hnd: Handle,
|
||||
socks: Slab<SocketInfo>,
|
||||
tcp_ka: Option<time::Duration>,
|
||||
}
|
||||
@ -77,7 +76,6 @@ impl<H: HttpHandler + 'static> Worker<H> {
|
||||
|
||||
Worker {
|
||||
settings: Rc::new(WorkerSettings::new(h, keep_alive)),
|
||||
hnd: Arbiter::handle().clone(),
|
||||
socks,
|
||||
tcp_ka,
|
||||
}
|
||||
@ -130,11 +128,11 @@ where
|
||||
if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() {
|
||||
error!("Can not set socket keep-alive option");
|
||||
}
|
||||
self.socks.get_mut(msg.token).unwrap().htype.handle(
|
||||
Rc::clone(&self.settings),
|
||||
&self.hnd,
|
||||
msg,
|
||||
);
|
||||
self.socks
|
||||
.get_mut(msg.token)
|
||||
.unwrap()
|
||||
.htype
|
||||
.handle(Rc::clone(&self.settings), msg);
|
||||
}
|
||||
}
|
||||
|
||||
@ -174,15 +172,15 @@ pub(crate) enum StreamHandlerType {
|
||||
|
||||
impl StreamHandlerType {
|
||||
fn handle<H: HttpHandler>(
|
||||
&mut self, h: Rc<WorkerSettings<H>>, hnd: &Handle, msg: Conn<net::TcpStream>,
|
||||
&mut self, h: Rc<WorkerSettings<H>>, msg: Conn<net::TcpStream>,
|
||||
) {
|
||||
match *self {
|
||||
StreamHandlerType::Normal => {
|
||||
let _ = msg.io.set_nodelay(true);
|
||||
let io = TcpStream::from_stream(msg.io, hnd)
|
||||
let io = TcpStream::from_std(msg.io, &Handle::default())
|
||||
.expect("failed to associate TCP stream");
|
||||
|
||||
hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
|
||||
Arbiter::spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
|
||||
}
|
||||
#[cfg(feature = "tls")]
|
||||
StreamHandlerType::Tls(ref acceptor) => {
|
||||
@ -190,47 +188,50 @@ impl StreamHandlerType {
|
||||
io, peer, http2, ..
|
||||
} = msg;
|
||||
let _ = io.set_nodelay(true);
|
||||
let io = TcpStream::from_stream(io, hnd)
|
||||
let io = TcpStream::from_std(io, &Handle::default())
|
||||
.expect("failed to associate TCP stream");
|
||||
|
||||
hnd.spawn(TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
|
||||
match res {
|
||||
Ok(io) => {
|
||||
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2))
|
||||
}
|
||||
Err(err) => {
|
||||
trace!("Error during handling tls connection: {}", err)
|
||||
}
|
||||
};
|
||||
future::result(Ok(()))
|
||||
}));
|
||||
Arbiter::spawn(TlsAcceptorExt::accept_async(acceptor, io).then(
|
||||
move |res| {
|
||||
match res {
|
||||
Ok(io) => {
|
||||
Arbiter::spawn(HttpChannel::new(h, io, peer, http2))
|
||||
}
|
||||
Err(err) => {
|
||||
trace!("Error during handling tls connection: {}", err)
|
||||
}
|
||||
};
|
||||
future::result(Ok(()))
|
||||
},
|
||||
));
|
||||
}
|
||||
#[cfg(feature = "alpn")]
|
||||
StreamHandlerType::Alpn(ref acceptor) => {
|
||||
let Conn { io, peer, .. } = msg;
|
||||
let _ = io.set_nodelay(true);
|
||||
let io = TcpStream::from_stream(io, hnd)
|
||||
let io = TcpStream::from_std(io, &Handle::default())
|
||||
.expect("failed to associate TCP stream");
|
||||
|
||||
hnd.spawn(SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
|
||||
match res {
|
||||
Ok(io) => {
|
||||
let http2 = if let Some(p) =
|
||||
io.get_ref().ssl().selected_alpn_protocol()
|
||||
{
|
||||
p.len() == 2 && &p == b"h2"
|
||||
} else {
|
||||
false
|
||||
};
|
||||
Arbiter::handle()
|
||||
.spawn(HttpChannel::new(h, io, peer, http2));
|
||||
}
|
||||
Err(err) => {
|
||||
trace!("Error during handling tls connection: {}", err)
|
||||
}
|
||||
};
|
||||
future::result(Ok(()))
|
||||
}));
|
||||
Arbiter::spawn(SslAcceptorExt::accept_async(acceptor, io).then(
|
||||
move |res| {
|
||||
match res {
|
||||
Ok(io) => {
|
||||
let http2 = if let Some(p) =
|
||||
io.get_ref().ssl().selected_alpn_protocol()
|
||||
{
|
||||
p.len() == 2 && &p == b"h2"
|
||||
} else {
|
||||
false
|
||||
};
|
||||
Arbiter::spawn(HttpChannel::new(h, io, peer, http2));
|
||||
}
|
||||
Err(err) => {
|
||||
trace!("Error during handling tls connection: {}", err)
|
||||
}
|
||||
};
|
||||
future::result(Ok(()))
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user