From c9a52e3197d3d34e41732f54cb99983b8d1bd8e7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 8 Sep 2018 09:20:18 -0700 Subject: [PATCH] refactor date generatioin --- src/server/channel.rs | 5 ++- src/server/h1.rs | 9 +++-- src/server/h1writer.rs | 5 ++- src/server/h2.rs | 12 +++---- src/server/h2writer.rs | 11 +++--- src/server/http.rs | 9 +++-- src/server/settings.rs | 80 ++++++++++++++++++++++-------------------- 7 files changed, 64 insertions(+), 67 deletions(-) diff --git a/src/server/channel.rs b/src/server/channel.rs index d83e9a38..6d0992bc 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -1,5 +1,4 @@ use std::net::{Shutdown, SocketAddr}; -use std::rc::Rc; use std::{io, ptr, time}; use bytes::{Buf, BufMut, BytesMut}; @@ -15,7 +14,7 @@ const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; enum HttpProtocol { H1(h1::Http1), H2(h2::Http2), - Unknown(Rc>, Option, T, BytesMut), + Unknown(WorkerSettings, Option, T, BytesMut), } enum ProtocolKind { @@ -40,7 +39,7 @@ where H: HttpHandler + 'static, { pub(crate) fn new( - settings: Rc>, io: T, peer: Option, + settings: WorkerSettings, io: T, peer: Option, ) -> HttpChannel { let ka_timeout = settings.keep_alive_timer(); diff --git a/src/server/h1.rs b/src/server/h1.rs index afe143b4..82ab914a 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -1,6 +1,5 @@ use std::collections::VecDeque; use std::net::SocketAddr; -use std::rc::Rc; use std::time::{Duration, Instant}; use bytes::BytesMut; @@ -43,7 +42,7 @@ bitflags! { pub(crate) struct Http1 { flags: Flags, - settings: Rc>, + settings: WorkerSettings, addr: Option, stream: H1Writer, decoder: H1Decoder, @@ -90,7 +89,7 @@ where H: HttpHandler + 'static, { pub fn new( - settings: Rc>, stream: T, addr: Option, + settings: WorkerSettings, stream: T, addr: Option, buf: BytesMut, is_eof: bool, keepalive_timer: Option, ) -> Self { Http1 { @@ -99,7 +98,7 @@ where } else { Flags::KEEPALIVE }, - stream: H1Writer::new(stream, Rc::clone(&settings)), + stream: H1Writer::new(stream, settings.clone()), decoder: H1Decoder::new(), payload: None, tasks: VecDeque::new(), @@ -112,7 +111,7 @@ where #[inline] pub fn settings(&self) -> &WorkerSettings { - self.settings.as_ref() + &self.settings } #[inline] diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 72a68aeb..15451659 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -1,7 +1,6 @@ // #![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))] use std::io::{self, Write}; -use std::rc::Rc; use bytes::{BufMut, BytesMut}; use futures::{Async, Poll}; @@ -38,11 +37,11 @@ pub(crate) struct H1Writer { headers_size: u32, buffer: Output, buffer_capacity: usize, - settings: Rc>, + settings: WorkerSettings, } impl H1Writer { - pub fn new(stream: T, settings: Rc>) -> H1Writer { + pub fn new(stream: T, settings: WorkerSettings) -> H1Writer { H1Writer { flags: Flags::KEEPALIVE, written: 0, diff --git a/src/server/h2.rs b/src/server/h2.rs index 913e2cd7..ba52a884 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -38,7 +38,7 @@ where H: HttpHandler + 'static, { flags: Flags, - settings: Rc>, + settings: WorkerSettings, addr: Option, state: State>, tasks: VecDeque>, @@ -58,7 +58,7 @@ where H: HttpHandler + 'static, { pub fn new( - settings: Rc>, io: T, addr: Option, buf: Bytes, + settings: WorkerSettings, io: T, addr: Option, buf: Bytes, keepalive_timer: Option, ) -> Self { let extensions = io.extensions(); @@ -83,7 +83,7 @@ where } pub fn settings(&self) -> &WorkerSettings { - self.settings.as_ref() + &self.settings } pub fn poll(&mut self) -> Poll<(), ()> { @@ -224,7 +224,7 @@ where body, resp, self.addr, - &self.settings, + self.settings.clone(), self.extensions.clone(), )); } @@ -343,7 +343,7 @@ struct Entry { impl Entry { fn new( parts: Parts, recv: RecvStream, resp: SendResponse, - addr: Option, settings: &Rc>, + addr: Option, settings: WorkerSettings, extensions: Option>, ) -> Entry where @@ -387,7 +387,7 @@ impl Entry { )) }), payload: psender, - stream: H2Writer::new(resp, Rc::clone(settings)), + stream: H2Writer::new(resp, settings), flags: EntryFlags::empty(), recv, } diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index 398e9817..4bfc1b7c 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -1,14 +1,12 @@ #![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))] +use std::{cmp, io}; + use bytes::{Bytes, BytesMut}; use futures::{Async, Poll}; use http2::server::SendResponse; use http2::{Reason, SendStream}; use modhttp::Response; -use std::rc::Rc; -use std::{cmp, io}; - -use http::{HttpTryFrom, Method, Version}; use super::helpers; use super::message::Request; @@ -20,6 +18,7 @@ use header::ContentEncoding; use http::header::{ HeaderValue, CONNECTION, CONTENT_ENCODING, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, }; +use http::{HttpTryFrom, Method, Version}; use httpresponse::HttpResponse; const CHUNK_SIZE: usize = 16_384; @@ -40,12 +39,12 @@ pub(crate) struct H2Writer { written: u64, buffer: Output, buffer_capacity: usize, - settings: Rc>, + settings: WorkerSettings, } impl H2Writer { pub fn new( - respond: SendResponse, settings: Rc>, + respond: SendResponse, settings: WorkerSettings, ) -> H2Writer { H2Writer { stream: None, diff --git a/src/server/http.rs b/src/server/http.rs index 5059b132..b55842fa 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -1,5 +1,4 @@ use std::marker::PhantomData; -use std::rc::Rc; use std::sync::Arc; use std::{io, mem, net, time}; @@ -10,7 +9,7 @@ use futures::{Async, Poll, Stream}; use net2::TcpBuilder; use num_cpus; -use actix_net::{ssl, NewService, Service, Server}; +use actix_net::{ssl, NewService, Server, Service}; //#[cfg(feature = "tls")] //use native_tls::TlsAcceptor; @@ -603,7 +602,7 @@ where H: HttpHandler, Io: IoStream, { - settings: Rc>, + settings: WorkerSettings, tcp_ka: Option, _t: PhantomData, } @@ -621,7 +620,7 @@ where } else { None }; - let settings = WorkerSettings::create(apps, keep_alive, settings); + let settings = WorkerSettings::new(apps, keep_alive, settings); HttpServiceHandler { tcp_ka, @@ -647,7 +646,7 @@ where fn call(&mut self, mut req: Self::Request) -> Self::Future { let _ = req.set_nodelay(true); - HttpChannel::new(Rc::clone(&self.settings), req, None) + HttpChannel::new(self.settings.clone(), req, None) } // fn shutdown(&self, force: bool) { diff --git a/src/server/settings.rs b/src/server/settings.rs index 2ca0b9b9..439d0e75 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -2,22 +2,21 @@ use std::cell::{RefCell, RefMut, UnsafeCell}; use std::collections::VecDeque; use std::fmt::Write; use std::rc::Rc; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::{env, fmt, net}; -use actix::Arbiter; use bytes::BytesMut; -use futures::Stream; +use futures::{future, Future}; use futures_cpupool::CpuPool; use http::StatusCode; use lazycell::LazyCell; use parking_lot::Mutex; use time; -use tokio_timer::{Delay, Interval}; +use tokio_timer::{sleep, Delay, Interval}; +use tokio_current_thread::spawn; use super::channel::Node; use super::message::{Request, RequestPool}; -// use super::server::{ConnectionRateTag, ConnectionTag, Connections}; use super::KeepAlive; use body::Body; use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool}; @@ -134,34 +133,21 @@ impl ServerSettings { // "Sun, 06 Nov 1994 08:49:37 GMT".len() const DATE_VALUE_LENGTH: usize = 29; -pub(crate) struct WorkerSettings { +pub(crate) struct WorkerSettings(Rc>); + +struct Inner { h: Vec, keep_alive: u64, ka_enabled: bool, bytes: Rc, messages: &'static RequestPool, node: RefCell>, - date: UnsafeCell, + date: UnsafeCell<(bool, Date)>, } -impl WorkerSettings { - pub(crate) fn create( - apps: Vec, keep_alive: KeepAlive, settings: ServerSettings, - ) -> Rc> { - let settings = Rc::new(Self::new(apps, keep_alive, settings)); - - // periodic date update - let s = settings.clone(); - Arbiter::spawn( - Interval::new(Instant::now(), Duration::from_secs(1)) - .map_err(|_| ()) - .and_then(move |_| { - s.update_date(); - Ok(()) - }).fold((), |(), _| Ok(())), - ); - - settings +impl Clone for WorkerSettings { + fn clone(&self) -> Self { + WorkerSettings(self.0.clone()) } } @@ -175,23 +161,23 @@ impl WorkerSettings { KeepAlive::Disabled => (0, false), }; - WorkerSettings { + WorkerSettings(Rc::new(Inner { h, + keep_alive, + ka_enabled, bytes: Rc::new(SharedBytesPool::new()), messages: RequestPool::pool(settings), node: RefCell::new(Node::head()), - date: UnsafeCell::new(Date::new()), - keep_alive, - ka_enabled, - } + date: UnsafeCell::new((false, Date::new())), + })) } pub fn head(&self) -> RefMut> { - self.node.borrow_mut() + self.0.node.borrow_mut() } pub fn handlers(&self) -> &Vec { - &self.h + &self.0.h } pub fn keep_alive_timer(&self) -> Option { @@ -205,33 +191,49 @@ impl WorkerSettings { } pub fn keep_alive(&self) -> u64 { - self.keep_alive + self.0.keep_alive } pub fn keep_alive_enabled(&self) -> bool { - self.ka_enabled + self.0.ka_enabled } pub fn get_bytes(&self) -> BytesMut { - self.bytes.get_bytes() + self.0.bytes.get_bytes() } pub fn release_bytes(&self, bytes: BytesMut) { - self.bytes.release_bytes(bytes) + self.0.bytes.release_bytes(bytes) } pub fn get_request(&self) -> Request { - RequestPool::get(self.messages) + RequestPool::get(self.0.messages) } fn update_date(&self) { // Unsafe: WorkerSetting is !Sync and !Send - unsafe { &mut *self.date.get() }.update(); + unsafe { (&mut *self.0.date.get()).0 = false }; } +} +impl WorkerSettings { pub fn set_date(&self, dst: &mut BytesMut, full: bool) { // Unsafe: WorkerSetting is !Sync and !Send - let date_bytes = unsafe { &(*self.date.get()).bytes }; + let date_bytes = unsafe { + let date = &mut (*self.0.date.get()); + if !date.0 { + date.1.update(); + date.0 = true; + + // periodic date update + let s = self.clone(); + spawn(sleep(Duration::from_secs(1)).then(move |_| { + s.update_date(); + future::ok(()) + })); + } + &date.1.bytes + }; if full { let mut buf: [u8; 39] = [0; 39]; buf[..6].copy_from_slice(b"date: ");