1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-06-30 20:04:26 +02:00

clippy warnings; fmt

This commit is contained in:
Nikolay Kim
2018-04-28 22:55:47 -07:00
parent a38c3985f6
commit de49796fd1
67 changed files with 988 additions and 1866 deletions

View File

@ -7,7 +7,7 @@ use futures::{Async, Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use super::settings::WorkerSettings;
use super::{utils, HttpHandler, IoStream, h1, h2};
use super::{h1, h2, utils, HttpHandler, IoStream};
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
@ -93,12 +93,12 @@ where
let el = self as *mut _;
self.node = Some(Node::new(el));
let _ = match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => self.node
.as_ref()
.map(|n| h1.settings().head().insert(n)),
Some(HttpProtocol::H2(ref mut h2)) => self.node
.as_ref()
.map(|n| h2.settings().head().insert(n)),
Some(HttpProtocol::H1(ref mut h1)) => {
self.node.as_ref().map(|n| h1.settings().head().insert(n))
}
Some(HttpProtocol::H2(ref mut h2)) => {
self.node.as_ref().map(|n| h2.settings().head().insert(n))
}
Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) => {
self.node.as_ref().map(|n| settings.head().insert(n))
}
@ -112,7 +112,9 @@ where
match result {
Ok(Async::Ready(())) | Err(_) => {
h1.settings().remove_channel();
self.node.as_mut().map(|n| n.remove());
if let Some(n) = self.node.as_mut() {
n.remove()
};
}
_ => (),
}
@ -123,7 +125,9 @@ where
match result {
Ok(Async::Ready(())) | Err(_) => {
h2.settings().remove_channel();
self.node.as_mut().map(|n| n.remove());
if let Some(n) = self.node.as_mut() {
n.remove()
};
}
_ => (),
}
@ -139,7 +143,9 @@ where
Ok(Async::Ready(0)) | Err(_) => {
debug!("Ignored premature client disconnection");
settings.remove_channel();
self.node.as_mut().map(|n| n.remove());
if let Some(n) = self.node.as_mut() {
n.remove()
};
return Err(());
}
_ => (),
@ -162,12 +168,8 @@ where
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
match kind {
ProtocolKind::Http1 => {
self.proto = Some(HttpProtocol::H1(h1::Http1::new(
settings,
io,
addr,
buf,
)));
self.proto =
Some(HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf)));
return self.poll();
}
ProtocolKind::Http2 => {
@ -204,7 +206,8 @@ impl<T> Node<T> {
#[allow(mutable_transmutes)]
unsafe {
if let Some(ref next2) = self.next {
let n: &mut Node<()> = mem::transmute(next2.as_ref().unwrap());
let n: &mut Node<()> =
&mut *(next2.as_ref().unwrap() as *const _ as *mut _);
n.prev = Some(next as *const _ as *mut _);
}
let slf: &mut Node<T> = mem::transmute(self);
@ -275,7 +278,9 @@ where
T: AsyncRead + AsyncWrite + 'static,
{
pub fn new(io: T) -> Self {
WrapperStream { io }
WrapperStream {
io,
}
}
}

View File

@ -763,8 +763,7 @@ impl TransferEncoding {
return Ok(*remaining == 0);
}
let len = cmp::min(*remaining, msg.len() as u64);
self.buffer
.extend(msg.take().split_to(len as usize).into());
self.buffer.extend(msg.take().split_to(len as usize).into());
*remaining -= len as u64;
Ok(*remaining == 0)
@ -856,10 +855,8 @@ impl AcceptEncoding {
/// Parse a raw Accept-Encoding header value into an ordered list.
pub fn parse(raw: &str) -> ContentEncoding {
let mut encodings: Vec<_> = raw.replace(' ', "")
.split(',')
.map(|l| AcceptEncoding::new(l))
.collect();
let mut encodings: Vec<_> =
raw.replace(' ', "").split(',').map(|l| AcceptEncoding::new(l)).collect();
encodings.sort();
for enc in encodings {
@ -879,9 +876,7 @@ mod tests {
fn test_chunked_te() {
let bytes = SharedBytes::default();
let mut enc = TransferEncoding::chunked(bytes.clone());
assert!(!enc.encode(Binary::from(b"test".as_ref()))
.ok()
.unwrap());
assert!(!enc.encode(Binary::from(b"test".as_ref())).ok().unwrap());
assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap());
assert_eq!(
bytes.get_mut().take().freeze(),

View File

@ -210,8 +210,7 @@ where
self.stream.reset();
if ready {
item.flags
.insert(EntryFlags::EOF | EntryFlags::FINISHED);
item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED);
} else {
item.flags.insert(EntryFlags::FINISHED);
}
@ -253,10 +252,7 @@ where
// cleanup finished tasks
let max = self.tasks.len() >= MAX_PIPELINED_MESSAGES;
while !self.tasks.is_empty() {
if self.tasks[0]
.flags
.contains(EntryFlags::EOF | EntryFlags::FINISHED)
{
if self.tasks[0].flags.contains(EntryFlags::EOF | EntryFlags::FINISHED) {
self.tasks.pop_front();
} else {
break;
@ -308,7 +304,10 @@ where
pub fn parse(&mut self) {
'outer: loop {
match self.decoder.decode(&mut self.buf, &self.settings) {
Ok(Some(Message::Message { msg, payload })) => {
Ok(Some(Message::Message {
msg,
payload,
})) => {
self.flags.insert(Flags::STARTED);
if payload {
@ -421,13 +420,19 @@ mod tests {
impl Message {
fn message(self) -> SharedHttpInnerMessage {
match self {
Message::Message { msg, payload: _ } => msg,
Message::Message {
msg,
payload: _,
} => msg,
_ => panic!("error"),
}
}
fn is_payload(&self) -> bool {
match *self {
Message::Message { msg: _, payload } => payload,
Message::Message {
msg: _,
payload,
} => payload,
_ => panic!("error"),
}
}
@ -623,10 +628,7 @@ mod tests {
assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test");
assert_eq!(
req.headers().get("test").unwrap().as_bytes(),
b"value"
);
assert_eq!(req.headers().get("test").unwrap().as_bytes(), b"value");
}
Ok(_) | Err(_) => unreachable!("Error during parsing http request"),
}
@ -848,12 +850,7 @@ mod tests {
assert!(!req.keep_alive());
assert!(req.upgrade());
assert_eq!(
reader
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.chunk()
.as_ref(),
reader.decode(&mut buf, &settings).unwrap().unwrap().chunk().as_ref(),
b"some raw data"
);
}
@ -910,30 +907,14 @@ mod tests {
buf.extend(b"4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n");
assert_eq!(
reader
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.chunk()
.as_ref(),
reader.decode(&mut buf, &settings).unwrap().unwrap().chunk().as_ref(),
b"data"
);
assert_eq!(
reader
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.chunk()
.as_ref(),
reader.decode(&mut buf, &settings).unwrap().unwrap().chunk().as_ref(),
b"line"
);
assert!(
reader
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.eof()
);
assert!(reader.decode(&mut buf, &settings).unwrap().unwrap().eof());
}
#[test]
@ -1014,13 +995,7 @@ mod tests {
assert!(reader.decode(&mut buf, &settings).unwrap().is_none());
buf.extend(b"\r\n");
assert!(
reader
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.eof()
);
assert!(reader.decode(&mut buf, &settings).unwrap().unwrap().eof());
}
#[test]
@ -1038,17 +1013,9 @@ mod tests {
assert!(req.chunked().unwrap());
buf.extend(b"4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n")
let chunk = reader
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.chunk();
let chunk = reader.decode(&mut buf, &settings).unwrap().unwrap().chunk();
assert_eq!(chunk, Bytes::from_static(b"data"));
let chunk = reader
.decode(&mut buf, &settings)
.unwrap()
.unwrap()
.chunk();
let chunk = reader.decode(&mut buf, &settings).unwrap().unwrap().chunk();
assert_eq!(chunk, Bytes::from_static(b"line"));
let msg = reader.decode(&mut buf, &settings).unwrap().unwrap();
assert!(msg.eof());

View File

@ -41,7 +41,9 @@ impl From<io::Error> for DecoderError {
impl H1Decoder {
pub fn new() -> H1Decoder {
H1Decoder { decoder: None }
H1Decoder {
decoder: None,
}
}
pub fn decode<H>(
@ -59,9 +61,7 @@ impl H1Decoder {
}
}
match self.parse_message(src, settings)
.map_err(DecoderError::Error)?
{
match self.parse_message(src, settings).map_err(DecoderError::Error)? {
Async::Ready((msg, decoder)) => {
if let Some(decoder) = decoder {
self.decoder = Some(decoder);
@ -103,7 +103,7 @@ impl H1Decoder {
let (len, method, path, version, headers_len) = {
let b = unsafe {
let b: &[u8] = buf;
mem::transmute(b)
&*(b as *const [u8])
};
let mut req = httparse::Request::new(&mut headers);
match req.parse(b)? {
@ -415,10 +415,9 @@ impl ChunkedState {
match byte!(rdr) {
b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)),
b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk size LF",
)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk size LF"))
}
}
}
@ -451,37 +450,33 @@ impl ChunkedState {
fn read_body_cr(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk body CR",
)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body CR"))
}
}
}
fn read_body_lf(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\n' => Ok(Async::Ready(ChunkedState::Size)),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk body LF",
)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body LF"))
}
}
}
fn read_end_cr(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::EndLf)),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk end CR",
)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end CR"))
}
}
}
fn read_end_lf(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\n' => Ok(Async::Ready(ChunkedState::End)),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk end LF",
)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end LF"))
}
}
}
}

View File

@ -2,8 +2,8 @@
use bytes::BufMut;
use futures::{Async, Poll};
use std::io;
use std::rc::Rc;
use std::{io, mem};
use tokio_io::AsyncWrite;
use super::encoding::ContentEncoder;
@ -13,10 +13,10 @@ use super::shared::SharedBytes;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body};
use header::ContentEncoding;
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
use http::{Method, Version};
use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse;
use http::{Method, Version};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@ -42,7 +42,7 @@ pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
pub fn new(
stream: T, buf: SharedBytes, settings: Rc<WorkerSettings<H>>
stream: T, buf: SharedBytes, settings: Rc<WorkerSettings<H>>,
) -> H1Writer<T, H> {
H1Writer {
flags: Flags::empty(),
@ -117,8 +117,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
let version = msg.version().unwrap_or_else(|| req.version);
if msg.upgrade() {
self.flags.insert(Flags::UPGRADE);
msg.headers_mut()
.insert(CONNECTION, HeaderValue::from_static("upgrade"));
msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("upgrade"));
}
// keep-alive
else if self.flags.contains(Flags::KEEPALIVE) {
@ -127,8 +126,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
}
} else if version >= Version::HTTP_11 {
msg.headers_mut()
.insert(CONNECTION, HeaderValue::from_static("close"));
msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("close"));
}
let body = msg.replace_body(Body::Empty);
@ -169,7 +167,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
let mut pos = 0;
let mut has_date = false;
let mut remaining = buffer.remaining_mut();
let mut buf: &mut [u8] = unsafe { mem::transmute(buffer.bytes_mut()) };
let mut buf = unsafe { &mut *(buffer.bytes_mut() as *mut [u8]) };
for (key, value) in msg.headers() {
if is_bin && key == CONTENT_LENGTH {
is_bin = false;
@ -184,7 +182,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
pos = 0;
buffer.reserve(len);
remaining = buffer.remaining_mut();
buf = unsafe { mem::transmute(buffer.bytes_mut()) };
buf = unsafe { &mut *(buffer.bytes_mut() as *mut _) };
}
buf[pos..pos + k.len()].copy_from_slice(k);
@ -272,7 +270,8 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
#[inline]
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
if !self.buffer.is_empty() {
let buf: &[u8] = unsafe { mem::transmute(self.buffer.as_ref()) };
let buf: &[u8] =
unsafe { &mut *(self.buffer.as_ref() as *const _ as *mut _) };
let written = self.write_data(buf)?;
let _ = self.buffer.split_to(written);
if self.buffer.len() > self.buffer_capacity {

View File

@ -61,7 +61,7 @@ where
H: HttpHandler + 'static,
{
pub fn new(
settings: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes
settings: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes,
) -> Self {
Http2 {
flags: Flags::empty(),

View File

@ -45,7 +45,7 @@ pub(crate) struct H2Writer<H: 'static> {
impl<H: 'static> H2Writer<H> {
pub fn new(
respond: SendResponse<Bytes>, buf: SharedBytes, settings: Rc<WorkerSettings<H>>
respond: SendResponse<Bytes>, buf: SharedBytes, settings: Rc<WorkerSettings<H>>,
) -> H2Writer<H> {
H2Writer {
respond,
@ -107,8 +107,7 @@ impl<H: 'static> Writer for H2Writer<H> {
);
}
Body::Empty => {
msg.headers_mut()
.insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
msg.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
}
_ => (),
}
@ -120,9 +119,7 @@ impl<H: 'static> Writer for H2Writer<H> {
resp.headers_mut().insert(key, value.clone());
}
match self.respond
.send_response(resp, self.flags.contains(Flags::EOF))
{
match self.respond.send_response(resp, self.flags.contains(Flags::EOF)) {
Ok(stream) => self.stream = Some(stream),
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "err")),
}

View File

@ -69,7 +69,7 @@ impl SharedHttpInnerMessage {
}
pub fn new(
msg: Rc<HttpInnerMessage>, pool: Rc<SharedMessagePool>
msg: Rc<HttpInnerMessage>, pool: Rc<SharedMessagePool>,
) -> SharedHttpInnerMessage {
SharedHttpInnerMessage(Some(msg), Some(pool))
}
@ -79,7 +79,7 @@ impl SharedHttpInnerMessage {
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut HttpInnerMessage {
let r: &HttpInnerMessage = self.0.as_ref().unwrap().as_ref();
unsafe { mem::transmute(r) }
unsafe { &mut *(r as *const _ as *mut _) }
}
#[inline(always)]
@ -96,9 +96,8 @@ const DEC_DIGITS_LUT: &[u8] = b"0001020304050607080910111213141516171819\
8081828384858687888990919293949596979899";
pub(crate) fn write_status_line(version: Version, mut n: u16, bytes: &mut BytesMut) {
let mut buf: [u8; 13] = [
b'H', b'T', b'T', b'P', b'/', b'1', b'.', b'1', b' ', b' ', b' ', b' ', b' '
];
let mut buf: [u8; 13] =
[b'H', b'T', b'T', b'P', b'/', b'1', b'.', b'1', b' ', b' ', b' ', b' ', b' '];
match version {
Version::HTTP_2 => buf[5] = b'2',
Version::HTTP_10 => buf[7] = b'0',
@ -251,63 +250,33 @@ mod tests {
let mut bytes = BytesMut::new();
bytes.reserve(50);
write_content_length(0, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 0\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 0\r\n"[..]);
bytes.reserve(50);
write_content_length(9, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 9\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 9\r\n"[..]);
bytes.reserve(50);
write_content_length(10, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 10\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 10\r\n"[..]);
bytes.reserve(50);
write_content_length(99, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 99\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 99\r\n"[..]);
bytes.reserve(50);
write_content_length(100, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 100\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 100\r\n"[..]);
bytes.reserve(50);
write_content_length(101, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 101\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 101\r\n"[..]);
bytes.reserve(50);
write_content_length(998, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 998\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 998\r\n"[..]);
bytes.reserve(50);
write_content_length(1000, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 1000\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 1000\r\n"[..]);
bytes.reserve(50);
write_content_length(1001, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 1001\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 1001\r\n"[..]);
bytes.reserve(50);
write_content_length(5909, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 5909\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 5909\r\n"[..]);
}
}

View File

@ -8,10 +8,10 @@ use std::sync::Arc;
use std::{fmt, mem, net};
use time;
use super::KeepAlive;
use super::channel::Node;
use super::helpers;
use super::shared::{SharedBytes, SharedBytesPool};
use super::KeepAlive;
use body::Body;
use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool};
@ -72,7 +72,7 @@ impl Default for ServerSettings {
impl ServerSettings {
/// Crate server settings instance
pub(crate) fn new(
addr: Option<net::SocketAddr>, host: &Option<String>, secure: bool
addr: Option<net::SocketAddr>, host: &Option<String>, secure: bool,
) -> ServerSettings {
let host = if let Some(ref host) = *host {
host.clone()
@ -119,7 +119,7 @@ impl ServerSettings {
#[inline]
pub(crate) fn get_response_builder(
&self, status: StatusCode
&self, status: StatusCode,
) -> HttpResponseBuilder {
HttpResponsePool::get_builder(&self.responses, status)
}
@ -255,10 +255,7 @@ mod tests {
#[test]
fn test_date_len() {
assert_eq!(
DATE_VALUE_LENGTH,
"Sun, 06 Nov 1994 08:49:37 GMT".len()
);
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
}
#[test]

View File

@ -1,8 +1,8 @@
use bytes::{BufMut, BytesMut};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io;
use std::rc::Rc;
use std::{io, mem};
use body::Binary;
@ -61,7 +61,7 @@ impl SharedBytes {
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub(crate) fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe { mem::transmute(r) }
unsafe { &mut *(r as *const _ as *mut _) }
}
#[inline]

View File

@ -219,10 +219,7 @@ where
if let Some(e) = err.take() {
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
Err(io::Error::new(io::ErrorKind::Other, "Can not bind to address."))
}
} else {
Ok(self)
@ -230,7 +227,7 @@ where
}
fn start_workers(
&mut self, settings: &ServerSettings, handler: &StreamHandlerType
&mut self, settings: &ServerSettings, handler: &StreamHandlerType,
) -> Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)> {
// start workers
let mut workers = Vec::new();
@ -332,9 +329,9 @@ impl<H: IntoHttpHandler> HttpServer<H> {
ctx.add_stream(rx);
self
});
signals.map(|signals| {
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
});
}
addr
}
}
@ -378,10 +375,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// Start listening for incoming tls connections.
pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result<Addr<Syn, Self>> {
if self.sockets.is_empty() {
Err(io::Error::new(
io::ErrorKind::Other,
"No socket addresses are bound",
))
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
let (tx, rx) = mpsc::unbounded();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
@ -427,13 +421,10 @@ impl<H: IntoHttpHandler> HttpServer<H> {
///
/// This method sets alpn protocols to "h2" and "http/1.1"
pub fn start_ssl(
mut self, mut builder: SslAcceptorBuilder
mut self, mut builder: SslAcceptorBuilder,
) -> io::Result<Addr<Syn, Self>> {
if self.sockets.is_empty() {
Err(io::Error::new(
io::ErrorKind::Other,
"No socket addresses are bound",
))
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
// alpn support
if !self.no_http2 {
@ -545,8 +536,9 @@ impl<H: IntoHttpHandler> HttpServer<H> {
}));
self
});
signals
.map(|signals| signals.do_send(signal::Subscribe(addr.clone().recipient())));
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
addr
}
}
@ -562,17 +554,35 @@ impl<H: IntoHttpHandler> Handler<signal::Signal> for HttpServer<H> {
signal::SignalType::Int => {
info!("SIGINT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
Handler::<StopServer>::handle(
self,
StopServer {
graceful: false,
},
ctx,
);
}
signal::SignalType::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
Handler::<StopServer>::handle(
self,
StopServer {
graceful: true,
},
ctx,
);
}
signal::SignalType::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
Handler::<StopServer>::handle(
self,
StopServer {
graceful: false,
},
ctx,
);
}
_ => (),
}
@ -696,7 +706,9 @@ impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H> {
let tx2 = tx.clone();
worker
.1
.send(StopWorker { graceful: dur })
.send(StopWorker {
graceful: dur,
})
.into_actor(self)
.then(move |_, slf, ctx| {
slf.workers.pop();
@ -746,9 +758,8 @@ fn start_accept_thread(
// start accept thread
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
let _ = thread::Builder::new()
.name(format!("Accept on {}", addr))
.spawn(move || {
let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(
move || {
const SRV: mio::Token = mio::Token(0);
const CMD: mio::Token = mio::Token(1);
@ -773,12 +784,9 @@ fn start_accept_thread(
}
// Start listening for incoming commands
if let Err(err) = poll.register(
&reg,
CMD,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
if let Err(err) =
poll.register(&reg, CMD, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register Registration: {}", err);
}
@ -909,13 +917,14 @@ fn start_accept_thread(
}
}
}
});
},
);
(readiness, tx)
}
fn create_tcp_listener(
addr: net::SocketAddr, backlog: i32
addr: net::SocketAddr, backlog: i32,
) -> io::Result<net::TcpListener> {
let builder = match addr {
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,

View File

@ -8,7 +8,7 @@ const LW_BUFFER_SIZE: usize = 4096;
const HW_BUFFER_SIZE: usize = 32_768;
pub fn read_from_io<T: IoStream>(
io: &mut T, buf: &mut BytesMut
io: &mut T, buf: &mut BytesMut,
) -> Poll<usize, io::Error> {
unsafe {
if buf.remaining_mut() < LW_BUFFER_SIZE {

View File

@ -1,5 +1,5 @@
use futures::Future;
use futures::unsync::oneshot;
use futures::Future;
use net2::TcpStreamExt;
use std::rc::Rc;
use std::{net, time};
@ -59,7 +59,7 @@ where
impl<H: HttpHandler + 'static> Worker<H> {
pub(crate) fn new(
h: Vec<H>, handler: StreamHandlerType, keep_alive: KeepAlive
h: Vec<H>, handler: StreamHandlerType, keep_alive: KeepAlive,
) -> Worker<H> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
@ -77,13 +77,11 @@ impl<H: HttpHandler + 'static> Worker<H> {
fn update_time(&self, ctx: &mut Context<Self>) {
self.settings.update_date();
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| {
slf.update_time(ctx)
});
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
fn shutdown_timeout(
&self, ctx: &mut Context<Self>, tx: oneshot::Sender<bool>, dur: time::Duration
&self, ctx: &mut Context<Self>, tx: oneshot::Sender<bool>, dur: time::Duration,
) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
@ -124,8 +122,7 @@ where
if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() {
error!("Can not set socket keep-alive option");
}
self.handler
.handle(Rc::clone(&self.settings), &self.hnd, msg);
self.handler.handle(Rc::clone(&self.settings), &self.hnd, msg);
}
}
@ -165,7 +162,7 @@ pub(crate) enum StreamHandlerType {
impl StreamHandlerType {
fn handle<H: HttpHandler>(
&mut self, h: Rc<WorkerSettings<H>>, hnd: &Handle, msg: Conn<net::TcpStream>
&mut self, h: Rc<WorkerSettings<H>>, hnd: &Handle, msg: Conn<net::TcpStream>,
) {
match *self {
StreamHandlerType::Normal => {
@ -177,60 +174,57 @@ impl StreamHandlerType {
}
#[cfg(feature = "tls")]
StreamHandlerType::Tls(ref acceptor) => {
let Conn { io, peer, http2 } = msg;
let Conn {
io,
peer,
http2,
} = msg;
let _ = io.set_nodelay(true);
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => Arbiter::handle().spawn(HttpChannel::new(
h,
io,
peer,
http2,
)),
Err(err) => {
trace!("Error during handling tls connection: {}", err)
}
};
future::result(Ok(()))
}),
);
hnd.spawn(TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2))
}
Err(err) => {
trace!("Error during handling tls connection: {}", err)
}
};
future::result(Ok(()))
}));
}
#[cfg(feature = "alpn")]
StreamHandlerType::Alpn(ref acceptor) => {
let Conn { io, peer, .. } = msg;
let Conn {
io,
peer,
..
} = msg;
let _ = io.set_nodelay(true);
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
let http2 = if let Some(p) =
io.get_ref().ssl().selected_alpn_protocol()
{
p.len() == 2 && &p == b"h2"
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(
h,
io,
peer,
http2,
));
}
Err(err) => {
trace!("Error during handling tls connection: {}", err)
}
};
future::result(Ok(()))
}),
);
hnd.spawn(SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
let http2 = if let Some(p) =
io.get_ref().ssl().selected_alpn_protocol()
{
p.len() == 2 && &p == b"h2"
} else {
false
};
Arbiter::handle()
.spawn(HttpChannel::new(h, io, peer, http2));
}
Err(err) => {
trace!("Error during handling tls connection: {}", err)
}
};
future::result(Ok(()))
}));
}
}
}