1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-06-29 19:24:58 +02:00

cleanup warnings

This commit is contained in:
Nikolay Kim
2018-07-04 21:01:27 +06:00
parent 4c5a63965e
commit 6fd686ef98
39 changed files with 116 additions and 226 deletions

View File

@ -2,7 +2,6 @@ use futures::{Async, Poll};
use super::{helpers, HttpHandlerTask, Writer};
use http::{StatusCode, Version};
use httpresponse::HttpResponse;
use Error;
pub(crate) struct ServerError(Version, StatusCode);
@ -16,7 +15,7 @@ impl ServerError {
impl HttpHandlerTask for ServerError {
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
{
let mut bytes = io.buffer();
let bytes = io.buffer();
helpers::write_status_line(self.0, self.1.as_u16(), bytes);
}
io.set_date();

View File

@ -9,10 +9,7 @@ use tokio_timer::Delay;
use error::{Error, PayloadError};
use http::{StatusCode, Version};
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use payload::{Payload, PayloadStatus, PayloadWriter};
use pipeline::Pipeline;
use super::error::ServerError;
use super::h1decoder::{DecoderError, H1Decoder, Message};

View File

@ -149,20 +149,18 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
let mut buffer = self.buffer.as_mut();
let reason = msg.reason().as_bytes();
let mut is_bin = if let Body::Binary(ref bytes) = body {
if let Body::Binary(ref bytes) = body {
buffer.reserve(
256
+ msg.headers().len() * AVERAGE_HEADER_SIZE
+ bytes.len()
+ reason.len(),
);
true
} else {
buffer.reserve(
256 + msg.headers().len() * AVERAGE_HEADER_SIZE + reason.len(),
);
false
};
}
// status line
helpers::write_status_line(version, msg.status().as_u16(), &mut buffer);

View File

@ -15,11 +15,7 @@ use tokio_timer::Delay;
use error::{Error, PayloadError};
use http::{StatusCode, Version};
use httpmessage::HttpMessage;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
use payload::{Payload, PayloadStatus, PayloadWriter};
use pipeline::Pipeline;
use uri::Url;
use super::error::ServerError;

View File

@ -172,7 +172,8 @@ impl Request {
}
#[inline]
pub(crate) fn reset(&mut self) {
/// Reset request instance
pub fn reset(&mut self) {
self.inner.headers.clear();
self.inner.extensions.borrow_mut().clear();
self.inner.flags.set(MessageFlags::empty());
@ -210,6 +211,7 @@ impl RequestPool {
}
#[inline]
/// Release request instance
pub fn release(&self, mut msg: Request) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {

View File

@ -33,7 +33,6 @@ use actix::Message;
use body::Binary;
use error::Error;
use header::ContentEncoding;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
/// max buffer size 64k

View File

@ -10,12 +10,10 @@ use bytes::BytesMut;
use flate2::write::{DeflateEncoder, GzEncoder};
#[cfg(feature = "flate2")]
use flate2::Compression;
use http::header::{
HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING,
};
use http::{HttpTryFrom, Method, Version};
use http::header::{ACCEPT_ENCODING, CONTENT_LENGTH};
use http::Version;
use super::message::{InnerRequest, Request};
use super::message::InnerRequest;
use body::{Binary, Body};
use header::ContentEncoding;
use httpresponse::HttpResponse;

View File

@ -11,7 +11,6 @@ use parking_lot::Mutex;
use time;
use super::channel::Node;
use super::helpers;
use super::message::{Request, RequestPool};
use super::KeepAlive;
use body::Body;
@ -161,7 +160,6 @@ pub(crate) struct WorkerSettings<H> {
channels: Cell<usize>,
node: Box<Node<()>>,
date: UnsafeCell<Date>,
settings: ServerSettings,
}
impl<H> WorkerSettings<H> {
@ -177,13 +175,12 @@ impl<H> WorkerSettings<H> {
WorkerSettings {
h: RefCell::new(h),
bytes: Rc::new(SharedBytesPool::new()),
messages: RequestPool::pool(settings.clone()),
messages: RequestPool::pool(settings),
channels: Cell::new(0),
node: Box::new(Node::head()),
date: UnsafeCell::new(Date::new()),
keep_alive,
ka_enabled,
settings,
}
}

View File

@ -5,7 +5,7 @@ use std::{io, net, thread};
use actix::{
fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
Response, StreamHandler2, System, WrapFuture,
Response, StreamHandler, System, WrapFuture,
};
use futures::sync::mpsc;
@ -447,7 +447,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
// start http server actor
let signals = self.subscribe_to_signals();
let addr = Actor::create(move |ctx| {
ctx.add_stream2(rx);
ctx.add_stream(rx);
self
});
if let Some(signals) = signals {
@ -613,51 +613,55 @@ impl<H: IntoHttpHandler> Handler<signal::Signal> for HttpServer<H> {
}
/// Commands from accept threads
impl<H: IntoHttpHandler> StreamHandler2<ServerCommand, ()> for HttpServer<H> {
fn handle(&mut self, msg: Result<Option<ServerCommand>, ()>, _: &mut Context<Self>) {
if let Ok(Some(ServerCommand::WorkerDied(idx, socks))) = msg {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
}
impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
fn finished(&mut self, _: &mut Context<Self>) {}
if found {
error!("Worker has died {:?}, restarting", idx);
let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>();
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx, socks) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
break;
}
let ka = self.keep_alive;
let factory = Arc::clone(&self.factory);
let host = self.host.clone();
let addr = socks[0].addr;
if found {
error!("Worker has died {:?}, restarting", idx);
let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>();
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let settings = ServerSettings::new(Some(addr), &host, false);
let apps: Vec<_> =
(*factory)().into_iter().map(|h| h.into_handler()).collect();
ctx.add_message_stream(rx);
Worker::new(apps, socks, ka, settings)
});
for item in &self.accept {
let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
let _ = item.0.set_readiness(mio::Ready::readable());
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let ka = self.keep_alive;
let factory = Arc::clone(&self.factory);
let host = self.host.clone();
let addr = socks[0].addr;
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let settings = ServerSettings::new(Some(addr), &host, false);
let apps: Vec<_> =
(*factory)().into_iter().map(|h| h.into_handler()).collect();
ctx.add_message_stream(rx);
Worker::new(apps, socks, ka, settings)
});
for item in &self.accept {
let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
let _ = item.0.set_readiness(mio::Ready::readable());
}
self.workers.push((new_idx, addr));
}
self.workers.push((new_idx, addr));
}
}
}