1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

refactor date generatioin

This commit is contained in:
Nikolay Kim 2018-09-08 09:20:18 -07:00
parent 1907102685
commit c9a52e3197
7 changed files with 64 additions and 67 deletions

View File

@ -1,5 +1,4 @@
use std::net::{Shutdown, SocketAddr}; use std::net::{Shutdown, SocketAddr};
use std::rc::Rc;
use std::{io, ptr, time}; use std::{io, ptr, time};
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
@ -15,7 +14,7 @@ const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
enum HttpProtocol<T: IoStream, H: HttpHandler + 'static> { enum HttpProtocol<T: IoStream, H: HttpHandler + 'static> {
H1(h1::Http1<T, H>), H1(h1::Http1<T, H>),
H2(h2::Http2<T, H>), H2(h2::Http2<T, H>),
Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut), Unknown(WorkerSettings<H>, Option<SocketAddr>, T, BytesMut),
} }
enum ProtocolKind { enum ProtocolKind {
@ -40,7 +39,7 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
pub(crate) fn new( pub(crate) fn new(
settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>, settings: WorkerSettings<H>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> { ) -> HttpChannel<T, H> {
let ka_timeout = settings.keep_alive_timer(); let ka_timeout = settings.keep_alive_timer();

View File

@ -1,6 +1,5 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::rc::Rc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use bytes::BytesMut; use bytes::BytesMut;
@ -43,7 +42,7 @@ bitflags! {
pub(crate) struct Http1<T: IoStream, H: HttpHandler + 'static> { pub(crate) struct Http1<T: IoStream, H: HttpHandler + 'static> {
flags: Flags, flags: Flags,
settings: Rc<WorkerSettings<H>>, settings: WorkerSettings<H>,
addr: Option<SocketAddr>, addr: Option<SocketAddr>,
stream: H1Writer<T, H>, stream: H1Writer<T, H>,
decoder: H1Decoder, decoder: H1Decoder,
@ -90,7 +89,7 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
pub fn new( pub fn new(
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>, settings: WorkerSettings<H>, stream: T, addr: Option<SocketAddr>,
buf: BytesMut, is_eof: bool, keepalive_timer: Option<Delay>, buf: BytesMut, is_eof: bool, keepalive_timer: Option<Delay>,
) -> Self { ) -> Self {
Http1 { Http1 {
@ -99,7 +98,7 @@ where
} else { } else {
Flags::KEEPALIVE Flags::KEEPALIVE
}, },
stream: H1Writer::new(stream, Rc::clone(&settings)), stream: H1Writer::new(stream, settings.clone()),
decoder: H1Decoder::new(), decoder: H1Decoder::new(),
payload: None, payload: None,
tasks: VecDeque::new(), tasks: VecDeque::new(),
@ -112,7 +111,7 @@ where
#[inline] #[inline]
pub fn settings(&self) -> &WorkerSettings<H> { pub fn settings(&self) -> &WorkerSettings<H> {
self.settings.as_ref() &self.settings
} }
#[inline] #[inline]

View File

@ -1,7 +1,6 @@
// #![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))] // #![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
use std::io::{self, Write}; use std::io::{self, Write};
use std::rc::Rc;
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
use futures::{Async, Poll}; use futures::{Async, Poll};
@ -38,11 +37,11 @@ pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
headers_size: u32, headers_size: u32,
buffer: Output, buffer: Output,
buffer_capacity: usize, buffer_capacity: usize,
settings: Rc<WorkerSettings<H>>, settings: WorkerSettings<H>,
} }
impl<T: AsyncWrite, H: 'static> H1Writer<T, H> { impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
pub fn new(stream: T, settings: Rc<WorkerSettings<H>>) -> H1Writer<T, H> { pub fn new(stream: T, settings: WorkerSettings<H>) -> H1Writer<T, H> {
H1Writer { H1Writer {
flags: Flags::KEEPALIVE, flags: Flags::KEEPALIVE,
written: 0, written: 0,

View File

@ -38,7 +38,7 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
flags: Flags, flags: Flags,
settings: Rc<WorkerSettings<H>>, settings: WorkerSettings<H>,
addr: Option<SocketAddr>, addr: Option<SocketAddr>,
state: State<IoWrapper<T>>, state: State<IoWrapper<T>>,
tasks: VecDeque<Entry<H>>, tasks: VecDeque<Entry<H>>,
@ -58,7 +58,7 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
pub fn new( pub fn new(
settings: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes, settings: WorkerSettings<H>, io: T, addr: Option<SocketAddr>, buf: Bytes,
keepalive_timer: Option<Delay>, keepalive_timer: Option<Delay>,
) -> Self { ) -> Self {
let extensions = io.extensions(); let extensions = io.extensions();
@ -83,7 +83,7 @@ where
} }
pub fn settings(&self) -> &WorkerSettings<H> { pub fn settings(&self) -> &WorkerSettings<H> {
self.settings.as_ref() &self.settings
} }
pub fn poll(&mut self) -> Poll<(), ()> { pub fn poll(&mut self) -> Poll<(), ()> {
@ -224,7 +224,7 @@ where
body, body,
resp, resp,
self.addr, self.addr,
&self.settings, self.settings.clone(),
self.extensions.clone(), self.extensions.clone(),
)); ));
} }
@ -343,7 +343,7 @@ struct Entry<H: HttpHandler + 'static> {
impl<H: HttpHandler + 'static> Entry<H> { impl<H: HttpHandler + 'static> Entry<H> {
fn new( fn new(
parts: Parts, recv: RecvStream, resp: SendResponse<Bytes>, parts: Parts, recv: RecvStream, resp: SendResponse<Bytes>,
addr: Option<SocketAddr>, settings: &Rc<WorkerSettings<H>>, addr: Option<SocketAddr>, settings: WorkerSettings<H>,
extensions: Option<Rc<Extensions>>, extensions: Option<Rc<Extensions>>,
) -> Entry<H> ) -> Entry<H>
where where
@ -387,7 +387,7 @@ impl<H: HttpHandler + 'static> Entry<H> {
)) ))
}), }),
payload: psender, payload: psender,
stream: H2Writer::new(resp, Rc::clone(settings)), stream: H2Writer::new(resp, settings),
flags: EntryFlags::empty(), flags: EntryFlags::empty(),
recv, recv,
} }

View File

@ -1,14 +1,12 @@
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))] #![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
use std::{cmp, io};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Async, Poll}; use futures::{Async, Poll};
use http2::server::SendResponse; use http2::server::SendResponse;
use http2::{Reason, SendStream}; use http2::{Reason, SendStream};
use modhttp::Response; use modhttp::Response;
use std::rc::Rc;
use std::{cmp, io};
use http::{HttpTryFrom, Method, Version};
use super::helpers; use super::helpers;
use super::message::Request; use super::message::Request;
@ -20,6 +18,7 @@ use header::ContentEncoding;
use http::header::{ use http::header::{
HeaderValue, CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, HeaderValue, CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, DATE, TRANSFER_ENCODING,
}; };
use http::{HttpTryFrom, Method, Version};
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
const CHUNK_SIZE: usize = 16_384; const CHUNK_SIZE: usize = 16_384;
@ -40,12 +39,12 @@ pub(crate) struct H2Writer<H: 'static> {
written: u64, written: u64,
buffer: Output, buffer: Output,
buffer_capacity: usize, buffer_capacity: usize,
settings: Rc<WorkerSettings<H>>, settings: WorkerSettings<H>,
} }
impl<H: 'static> H2Writer<H> { impl<H: 'static> H2Writer<H> {
pub fn new( pub fn new(
respond: SendResponse<Bytes>, settings: Rc<WorkerSettings<H>>, respond: SendResponse<Bytes>, settings: WorkerSettings<H>,
) -> H2Writer<H> { ) -> H2Writer<H> {
H2Writer { H2Writer {
stream: None, stream: None,

View File

@ -1,5 +1,4 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::{io, mem, net, time}; use std::{io, mem, net, time};
@ -10,7 +9,7 @@ use futures::{Async, Poll, Stream};
use net2::TcpBuilder; use net2::TcpBuilder;
use num_cpus; use num_cpus;
use actix_net::{ssl, NewService, Service, Server}; use actix_net::{ssl, NewService, Server, Service};
//#[cfg(feature = "tls")] //#[cfg(feature = "tls")]
//use native_tls::TlsAcceptor; //use native_tls::TlsAcceptor;
@ -603,7 +602,7 @@ where
H: HttpHandler, H: HttpHandler,
Io: IoStream, Io: IoStream,
{ {
settings: Rc<WorkerSettings<H>>, settings: WorkerSettings<H>,
tcp_ka: Option<time::Duration>, tcp_ka: Option<time::Duration>,
_t: PhantomData<Io>, _t: PhantomData<Io>,
} }
@ -621,7 +620,7 @@ where
} else { } else {
None None
}; };
let settings = WorkerSettings::create(apps, keep_alive, settings); let settings = WorkerSettings::new(apps, keep_alive, settings);
HttpServiceHandler { HttpServiceHandler {
tcp_ka, tcp_ka,
@ -647,7 +646,7 @@ where
fn call(&mut self, mut req: Self::Request) -> Self::Future { fn call(&mut self, mut req: Self::Request) -> Self::Future {
let _ = req.set_nodelay(true); let _ = req.set_nodelay(true);
HttpChannel::new(Rc::clone(&self.settings), req, None) HttpChannel::new(self.settings.clone(), req, None)
} }
// fn shutdown(&self, force: bool) { // fn shutdown(&self, force: bool) {

View File

@ -2,22 +2,21 @@ use std::cell::{RefCell, RefMut, UnsafeCell};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt::Write; use std::fmt::Write;
use std::rc::Rc; use std::rc::Rc;
use std::time::{Duration, Instant}; use std::time::Duration;
use std::{env, fmt, net}; use std::{env, fmt, net};
use actix::Arbiter;
use bytes::BytesMut; use bytes::BytesMut;
use futures::Stream; use futures::{future, Future};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use http::StatusCode; use http::StatusCode;
use lazycell::LazyCell; use lazycell::LazyCell;
use parking_lot::Mutex; use parking_lot::Mutex;
use time; use time;
use tokio_timer::{Delay, Interval}; use tokio_timer::{sleep, Delay, Interval};
use tokio_current_thread::spawn;
use super::channel::Node; use super::channel::Node;
use super::message::{Request, RequestPool}; use super::message::{Request, RequestPool};
// use super::server::{ConnectionRateTag, ConnectionTag, Connections};
use super::KeepAlive; use super::KeepAlive;
use body::Body; use body::Body;
use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool}; use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool};
@ -134,34 +133,21 @@ impl ServerSettings {
// "Sun, 06 Nov 1994 08:49:37 GMT".len() // "Sun, 06 Nov 1994 08:49:37 GMT".len()
const DATE_VALUE_LENGTH: usize = 29; const DATE_VALUE_LENGTH: usize = 29;
pub(crate) struct WorkerSettings<H> { pub(crate) struct WorkerSettings<H>(Rc<Inner<H>>);
struct Inner<H> {
h: Vec<H>, h: Vec<H>,
keep_alive: u64, keep_alive: u64,
ka_enabled: bool, ka_enabled: bool,
bytes: Rc<SharedBytesPool>, bytes: Rc<SharedBytesPool>,
messages: &'static RequestPool, messages: &'static RequestPool,
node: RefCell<Node<()>>, node: RefCell<Node<()>>,
date: UnsafeCell<Date>, date: UnsafeCell<(bool, Date)>,
} }
impl<H: 'static> WorkerSettings<H> { impl<H> Clone for WorkerSettings<H> {
pub(crate) fn create( fn clone(&self) -> Self {
apps: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings, WorkerSettings(self.0.clone())
) -> Rc<WorkerSettings<H>> {
let settings = Rc::new(Self::new(apps, keep_alive, settings));
// periodic date update
let s = settings.clone();
Arbiter::spawn(
Interval::new(Instant::now(), Duration::from_secs(1))
.map_err(|_| ())
.and_then(move |_| {
s.update_date();
Ok(())
}).fold((), |(), _| Ok(())),
);
settings
} }
} }
@ -175,23 +161,23 @@ impl<H> WorkerSettings<H> {
KeepAlive::Disabled => (0, false), KeepAlive::Disabled => (0, false),
}; };
WorkerSettings { WorkerSettings(Rc::new(Inner {
h, h,
keep_alive,
ka_enabled,
bytes: Rc::new(SharedBytesPool::new()), bytes: Rc::new(SharedBytesPool::new()),
messages: RequestPool::pool(settings), messages: RequestPool::pool(settings),
node: RefCell::new(Node::head()), node: RefCell::new(Node::head()),
date: UnsafeCell::new(Date::new()), date: UnsafeCell::new((false, Date::new())),
keep_alive, }))
ka_enabled,
}
} }
pub fn head(&self) -> RefMut<Node<()>> { pub fn head(&self) -> RefMut<Node<()>> {
self.node.borrow_mut() self.0.node.borrow_mut()
} }
pub fn handlers(&self) -> &Vec<H> { pub fn handlers(&self) -> &Vec<H> {
&self.h &self.0.h
} }
pub fn keep_alive_timer(&self) -> Option<Delay> { pub fn keep_alive_timer(&self) -> Option<Delay> {
@ -205,33 +191,49 @@ impl<H> WorkerSettings<H> {
} }
pub fn keep_alive(&self) -> u64 { pub fn keep_alive(&self) -> u64 {
self.keep_alive self.0.keep_alive
} }
pub fn keep_alive_enabled(&self) -> bool { pub fn keep_alive_enabled(&self) -> bool {
self.ka_enabled self.0.ka_enabled
} }
pub fn get_bytes(&self) -> BytesMut { pub fn get_bytes(&self) -> BytesMut {
self.bytes.get_bytes() self.0.bytes.get_bytes()
} }
pub fn release_bytes(&self, bytes: BytesMut) { pub fn release_bytes(&self, bytes: BytesMut) {
self.bytes.release_bytes(bytes) self.0.bytes.release_bytes(bytes)
} }
pub fn get_request(&self) -> Request { pub fn get_request(&self) -> Request {
RequestPool::get(self.messages) RequestPool::get(self.0.messages)
} }
fn update_date(&self) { fn update_date(&self) {
// Unsafe: WorkerSetting is !Sync and !Send // Unsafe: WorkerSetting is !Sync and !Send
unsafe { &mut *self.date.get() }.update(); unsafe { (&mut *self.0.date.get()).0 = false };
}
} }
impl<H: 'static> WorkerSettings<H> {
pub fn set_date(&self, dst: &mut BytesMut, full: bool) { pub fn set_date(&self, dst: &mut BytesMut, full: bool) {
// Unsafe: WorkerSetting is !Sync and !Send // Unsafe: WorkerSetting is !Sync and !Send
let date_bytes = unsafe { &(*self.date.get()).bytes }; let date_bytes = unsafe {
let date = &mut (*self.0.date.get());
if !date.0 {
date.1.update();
date.0 = true;
// periodic date update
let s = self.clone();
spawn(sleep(Duration::from_secs(1)).then(move |_| {
s.update_date();
future::ok(())
}));
}
&date.1.bytes
};
if full { if full {
let mut buf: [u8; 39] = [0; 39]; let mut buf: [u8; 39] = [0; 39];
buf[..6].copy_from_slice(b"date: "); buf[..6].copy_from_slice(b"date: ");