From 8a058efb4e3fa0554579552abf0d34328c635c19 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 11 Jan 2018 18:35:05 -0800 Subject: [PATCH] move server protocol impl to submodule --- guide/src/qs_3_5.md | 2 +- src/application.rs | 3 +- src/lib.rs | 15 +--- src/pipeline.rs | 3 +- src/{ => server}/channel.rs | 55 +------------- src/{ => server}/h1.rs | 12 +-- src/{ => server}/h1writer.rs | 25 +------ src/{ => server}/h2.rs | 7 +- src/{ => server}/h2writer.rs | 3 +- src/server/mod.rs | 109 +++++++++++++++++++++++++++ src/server/settings.rs | 125 +++++++++++++++++++++++++++++++ src/{server.rs => server/srv.rs} | 76 ++----------------- src/{ => server}/worker.rs | 63 ++-------------- src/test.rs | 3 +- tests/test_server.rs | 4 +- 15 files changed, 269 insertions(+), 236 deletions(-) rename src/{ => server}/channel.rs (87%) rename src/{ => server}/h1.rs (99%) rename src/{ => server}/h1writer.rs (92%) rename src/{ => server}/h2.rs (99%) rename src/{ => server}/h2writer.rs (98%) create mode 100644 src/server/mod.rs create mode 100644 src/server/settings.rs rename src/{server.rs => server/srv.rs} (93%) rename src/{ => server}/worker.rs (78%) diff --git a/guide/src/qs_3_5.md b/guide/src/qs_3_5.md index 580f029d9..977c1da8d 100644 --- a/guide/src/qs_3_5.md +++ b/guide/src/qs_3_5.md @@ -67,7 +67,7 @@ fn main() { let addr = rx.recv().unwrap(); let _ = addr.call_fut( - dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. + server::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. } ``` diff --git a/src/application.rs b/src/application.rs index 8cf5db269..f7b93e1f2 100644 --- a/src/application.rs +++ b/src/application.rs @@ -8,10 +8,9 @@ use router::{Router, Pattern}; use resource::Resource; use handler::{Handler, RouteHandler, WrapHandler}; use httprequest::HttpRequest; -use channel::{HttpHandler, IntoHttpHandler, HttpHandlerTask}; use pipeline::{Pipeline, PipelineHandler}; use middleware::Middleware; -use server::ServerSettings; +use server::{HttpHandler, IntoHttpHandler, HttpHandlerTask, ServerSettings}; /// Application pub struct HttpApplication { diff --git a/src/lib.rs b/src/lib.rs index cd74f6f0d..cafe803ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,13 +104,6 @@ mod param; mod resource; mod handler; mod pipeline; -mod server; -mod worker; -mod channel; -mod h1; -mod h2; -mod h1writer; -mod h2writer; pub mod fs; pub mod ws; @@ -121,17 +114,18 @@ pub mod middleware; pub mod pred; pub mod test; pub mod payload; +pub mod server; pub use error::{Error, Result, ResponseError}; pub use body::{Body, Binary}; -pub use json::{Json}; +pub use json::Json; pub use application::Application; pub use httprequest::HttpRequest; pub use httpresponse::HttpResponse; pub use handler::{Reply, Responder, NormalizePath, AsyncResponder}; pub use route::Route; pub use resource::Resource; -pub use server::HttpServer; pub use context::HttpContext; +pub use server::HttpServer; // re-exports pub use http::{Method, StatusCode, Version}; @@ -171,10 +165,7 @@ pub mod dev { pub use handler::Handler; pub use json::JsonBody; pub use router::{Router, Pattern}; - pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; pub use param::{FromParam, Params}; pub use httprequest::{UrlEncoded, RequestBody}; pub use httpresponse::HttpResponseBuilder; - - pub use server::{ServerSettings, PauseServer, ResumeServer, StopServer}; } diff --git a/src/pipeline.rs b/src/pipeline.rs index 195a12d9b..0cd6f7531 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -6,16 +6,15 @@ use std::marker::PhantomData; use futures::{Async, Poll, Future, Stream}; use futures::unsync::oneshot; -use channel::HttpHandlerTask; use body::{Body, BodyStream}; use context::{Frame, ActorHttpContext}; use error::Error; use handler::{Reply, ReplyItem}; -use h1writer::{Writer, WriterState}; use httprequest::HttpRequest; use httpresponse::HttpResponse; use middleware::{Middleware, Finished, Started, Response}; use application::Inner; +use server::{Writer, WriterState, HttpHandlerTask}; pub(crate) trait PipelineHandler { fn handle(&mut self, req: HttpRequest) -> Reply; diff --git a/src/channel.rs b/src/server/channel.rs similarity index 87% rename from src/channel.rs rename to src/server/channel.rs index ef8afbd16..aadbe8532 100644 --- a/src/channel.rs +++ b/src/server/channel.rs @@ -7,49 +7,10 @@ use futures::{Future, Poll, Async}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::TcpStream; -use {h1, h2}; -use error::Error; -use h1writer::Writer; -use httprequest::HttpRequest; -use server::ServerSettings; -use worker::WorkerSettings; +use super::{h1, h2, HttpHandler, IoStream}; +use super::settings::WorkerSettings; -/// Low level http request handler -#[allow(unused_variables)] -pub trait HttpHandler: 'static { - - /// Handle request - fn handle(&mut self, req: HttpRequest) -> Result, HttpRequest>; -} - -pub trait HttpHandlerTask { - - fn poll_io(&mut self, io: &mut Writer) -> Poll; - - fn poll(&mut self) -> Poll<(), Error>; - - fn disconnected(&mut self); -} - -/// Conversion helper trait -pub trait IntoHttpHandler { - /// The associated type which is result of conversion. - type Handler: HttpHandler; - - /// Convert into `HttpHandler` object. - fn into_handler(self, settings: ServerSettings) -> Self::Handler; -} - -impl IntoHttpHandler for T { - type Handler = T; - - fn into_handler(self, _: ServerSettings) -> Self::Handler { - self - } -} - -enum HttpProtocol -{ +enum HttpProtocol { H1(h1::Http1), H2(h2::Http2), } @@ -247,16 +208,6 @@ impl Node<()> { } } - -/// Low-level io stream operations -pub trait IoStream: AsyncRead + AsyncWrite + 'static { - fn shutdown(&mut self, how: Shutdown) -> io::Result<()>; - - fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>; - - fn set_linger(&mut self, dur: Option) -> io::Result<()>; -} - impl IoStream for TcpStream { #[inline] fn shutdown(&mut self, how: Shutdown) -> io::Result<()> { diff --git a/src/h1.rs b/src/server/h1.rs similarity index 99% rename from src/h1.rs rename to src/server/h1.rs index c0a1c68db..8ddb68628 100644 --- a/src/h1.rs +++ b/src/server/h1.rs @@ -14,14 +14,16 @@ use tokio_core::reactor::Timeout; use pipeline::Pipeline; use encoding::PayloadType; -use channel::{HttpHandler, HttpHandlerTask, IoStream}; -use h1writer::{Writer, H1Writer}; -use worker::WorkerSettings; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; use error::{ParseError, PayloadError, ResponseError}; use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE}; +use super::Writer; +use super::h1writer::H1Writer; +use super::settings::WorkerSettings; +use super::{HttpHandler, HttpHandlerTask, IoStream}; + const LW_BUFFER_SIZE: usize = 4096; const HW_BUFFER_SIZE: usize = 16_384; const MAX_BUFFER_SIZE: usize = 131_072; @@ -901,8 +903,8 @@ mod tests { use super::*; use application::HttpApplication; - use worker::WorkerSettings; - use channel::IoStream; + use server::settings::WorkerSettings; + use server::IoStream; struct Buffer { buf: Bytes, diff --git a/src/h1writer.rs b/src/server/h1writer.rs similarity index 92% rename from src/h1writer.rs rename to src/server/h1writer.rs index fd5551724..75ae37115 100644 --- a/src/h1writer.rs +++ b/src/server/h1writer.rs @@ -11,32 +11,9 @@ use helpers::SharedBytes; use encoding::PayloadEncoder; use httprequest::HttpMessage; use httpresponse::HttpResponse; +use server::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific -const MAX_WRITE_BUFFER_SIZE: usize = 65_536; // max buffer size 64k - - -#[derive(Debug)] -pub enum WriterState { - Done, - Pause, -} - -/// Send stream -pub trait Writer { - fn written(&self) -> u64; - - fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse) - -> Result; - - fn write(&mut self, payload: &[u8]) -> Result; - - fn write_eof(&mut self) -> Result; - - fn flush(&mut self) -> Poll<(), io::Error>; - - fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>; -} bitflags! { struct Flags: u8 { diff --git a/src/h2.rs b/src/server/h2.rs similarity index 99% rename from src/h2.rs rename to src/server/h2.rs index e60d799d6..e247d2b34 100644 --- a/src/h2.rs +++ b/src/server/h2.rs @@ -15,15 +15,16 @@ use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::reactor::Timeout; use pipeline::Pipeline; -use h2writer::H2Writer; -use worker::WorkerSettings; -use channel::{HttpHandler, HttpHandlerTask}; use error::PayloadError; use encoding::PayloadType; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; use payload::{Payload, PayloadWriter}; +use super::h2writer::H2Writer; +use super::settings::WorkerSettings; +use super::{HttpHandler, HttpHandlerTask}; + bitflags! { struct Flags: u8 { const DISCONNECTED = 0b0000_0010; diff --git a/src/h2writer.rs b/src/server/h2writer.rs similarity index 98% rename from src/h2writer.rs rename to src/server/h2writer.rs index 4707b8ee5..8bf8f94fb 100644 --- a/src/h2writer.rs +++ b/src/server/h2writer.rs @@ -12,10 +12,9 @@ use helpers::SharedBytes; use encoding::PayloadEncoder; use httprequest::HttpMessage; use httpresponse::HttpResponse; -use h1writer::{Writer, WriterState}; +use server::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; const CHUNK_SIZE: usize = 16_384; -const MAX_WRITE_BUFFER_SIZE: usize = 65_536; // max buffer size 64k bitflags! { struct Flags: u8 { diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 000000000..1903eefa2 --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,109 @@ +//! Http server +use std::{time, io}; +use std::net::Shutdown; + +use futures::Poll; +use tokio_io::{AsyncRead, AsyncWrite}; + +mod srv; +mod worker; +mod channel; +mod h1; +mod h2; +mod h1writer; +mod h2writer; +mod settings; + +pub use self::srv::HttpServer; +pub use self::settings::ServerSettings; + +use error::Error; +use httprequest::{HttpMessage, HttpRequest}; +use httpresponse::HttpResponse; + +/// max buffer size 64k +pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536; + +/// Pause accepting incoming connections +/// +/// If socket contains some pending connection, they might be dropped. +/// All opened connection remains active. +#[derive(Message)] +pub struct PauseServer; + +/// Resume accepting incoming connections +#[derive(Message)] +pub struct ResumeServer; + +/// Stop incoming connection processing, stop all workers and exit. +/// +/// If server starts with `spawn()` method, then spawned thread get terminated. +#[derive(Message)] +pub struct StopServer { + pub graceful: bool +} + +/// Low level http request handler +#[allow(unused_variables)] +pub trait HttpHandler: 'static { + + /// Handle request + fn handle(&mut self, req: HttpRequest) -> Result, HttpRequest>; +} + +pub trait HttpHandlerTask { + + fn poll_io(&mut self, io: &mut Writer) -> Poll; + + fn poll(&mut self) -> Poll<(), Error>; + + fn disconnected(&mut self); +} + +/// Conversion helper trait +pub trait IntoHttpHandler { + /// The associated type which is result of conversion. + type Handler: HttpHandler; + + /// Convert into `HttpHandler` object. + fn into_handler(self, settings: ServerSettings) -> Self::Handler; +} + +impl IntoHttpHandler for T { + type Handler = T; + + fn into_handler(self, _: ServerSettings) -> Self::Handler { + self + } +} + +/// Low-level io stream operations +pub trait IoStream: AsyncRead + AsyncWrite + 'static { + fn shutdown(&mut self, how: Shutdown) -> io::Result<()>; + + fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>; + + fn set_linger(&mut self, dur: Option) -> io::Result<()>; +} + +#[derive(Debug)] +pub enum WriterState { + Done, + Pause, +} + +/// Stream writer +pub trait Writer { + fn written(&self) -> u64; + + fn start(&mut self, req: &mut HttpMessage, resp: &mut HttpResponse) + -> Result; + + fn write(&mut self, payload: &[u8]) -> Result; + + fn write_eof(&mut self) -> Result; + + fn flush(&mut self) -> Poll<(), io::Error>; + + fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>; +} diff --git a/src/server/settings.rs b/src/server/settings.rs new file mode 100644 index 000000000..b6cc634ed --- /dev/null +++ b/src/server/settings.rs @@ -0,0 +1,125 @@ +use std::net; +use std::rc::Rc; +use std::cell::{Cell, RefCell, RefMut}; + +use helpers; +use super::channel::Node; + +/// Various server settings +#[derive(Debug, Clone)] +pub struct ServerSettings { + addr: Option, + secure: bool, + host: String, +} + +impl Default for ServerSettings { + fn default() -> Self { + ServerSettings { + addr: None, + secure: false, + host: "localhost:8080".to_owned(), + } + } +} + +impl ServerSettings { + /// Crate server settings instance + pub(crate) fn new(addr: Option, host: &Option, secure: bool) + -> ServerSettings + { + let host = if let Some(ref host) = *host { + host.clone() + } else if let Some(ref addr) = addr { + format!("{}", addr) + } else { + "localhost".to_owned() + }; + ServerSettings { + addr: addr, + secure: secure, + host: host, + } + } + + /// Returns the socket address of the local half of this TCP connection + pub fn local_addr(&self) -> Option { + self.addr + } + + /// Returns true if connection is secure(https) + pub fn secure(&self) -> bool { + self.secure + } + + /// Returns host header value + pub fn host(&self) -> &str { + &self.host + } +} + + +pub(crate) struct WorkerSettings { + h: RefCell>, + enabled: bool, + keep_alive: u64, + bytes: Rc, + messages: Rc, + channels: Cell, + node: Node<()>, +} + +impl WorkerSettings { + pub(crate) fn new(h: Vec, keep_alive: Option) -> WorkerSettings { + WorkerSettings { + h: RefCell::new(h), + enabled: if let Some(ka) = keep_alive { ka > 0 } else { false }, + keep_alive: keep_alive.unwrap_or(0), + bytes: Rc::new(helpers::SharedBytesPool::new()), + messages: Rc::new(helpers::SharedMessagePool::new()), + channels: Cell::new(0), + node: Node::head(), + } + } + + pub fn num_channels(&self) -> usize { + self.channels.get() + } + + pub fn head(&self) -> &Node<()> { + &self.node + } + + pub fn handlers(&self) -> RefMut> { + self.h.borrow_mut() + } + + pub fn keep_alive(&self) -> u64 { + self.keep_alive + } + + pub fn keep_alive_enabled(&self) -> bool { + self.enabled + } + + pub fn get_shared_bytes(&self) -> helpers::SharedBytes { + helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) + } + + pub fn get_http_message(&self) -> helpers::SharedHttpMessage { + helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages)) + } + + pub fn add_channel(&self) { + self.channels.set(self.channels.get() + 1); + } + + pub fn remove_channel(&self) { + let num = self.channels.get(); + if num > 0 { + self.channels.set(num-1); + } else { + error!("Number of removed channels is bigger than added channel. Bug in actix-web"); + } + } +} diff --git a/src/server.rs b/src/server/srv.rs similarity index 93% rename from src/server.rs rename to src/server/srv.rs index ded8c715b..050c862c0 100644 --- a/src/server.rs +++ b/src/server/srv.rs @@ -28,59 +28,12 @@ use openssl::pkcs12::ParsedPkcs12; use tokio_openssl::SslStream; use helpers; -use channel::{HttpChannel, HttpHandler, IntoHttpHandler, IoStream, WrapperStream}; -use worker::{Conn, Worker, WorkerSettings, StreamHandlerType, StopWorker}; +use super::{HttpHandler, IntoHttpHandler, IoStream}; +use super::{PauseServer, ResumeServer, StopServer}; +use super::channel::{HttpChannel, WrapperStream}; +use super::worker::{Conn, Worker, StreamHandlerType, StopWorker}; +use super::settings::{ServerSettings, WorkerSettings}; -/// Various server settings -#[derive(Debug, Clone)] -pub struct ServerSettings { - addr: Option, - secure: bool, - host: String, -} - -impl Default for ServerSettings { - fn default() -> Self { - ServerSettings { - addr: None, - secure: false, - host: "localhost:8080".to_owned(), - } - } -} - -impl ServerSettings { - /// Crate server settings instance - fn new(addr: Option, host: &Option, secure: bool) -> Self { - let host = if let Some(ref host) = *host { - host.clone() - } else if let Some(ref addr) = addr { - format!("{}", addr) - } else { - "localhost".to_owned() - }; - ServerSettings { - addr: addr, - secure: secure, - host: host, - } - } - - /// Returns the socket address of the local half of this TCP connection - pub fn local_addr(&self) -> Option { - self.addr - } - - /// Returns true if connection is secure(https) - pub fn secure(&self) -> bool { - self.secure - } - - /// Returns host header value - pub fn host(&self) -> &str { - &self.host - } -} /// An HTTP Server /// @@ -585,25 +538,6 @@ impl Handler>> for HttpServer } } -/// Pause accepting incoming connections -/// -/// If socket contains some pending connection, they might be dropped. -/// All opened connection remains active. -#[derive(Message)] -pub struct PauseServer; - -/// Resume accepting incoming connections -#[derive(Message)] -pub struct ResumeServer; - -/// Stop incoming connection processing, stop all workers and exit. -/// -/// If server starts with `spawn()` method, then spawned thread get terminated. -#[derive(Message)] -pub struct StopServer { - pub graceful: bool -} - impl Handler for HttpServer where T: IoStream, H: HttpHandler + 'static, diff --git a/src/worker.rs b/src/server/worker.rs similarity index 78% rename from src/worker.rs rename to src/server/worker.rs index 7b996a430..1daab36f8 100644 --- a/src/worker.rs +++ b/src/server/worker.rs @@ -1,6 +1,5 @@ use std::{net, time}; use std::rc::Rc; -use std::cell::{Cell, RefCell, RefMut}; use futures::Future; use futures::unsync::oneshot; use tokio_core::net::TcpStream; @@ -25,7 +24,9 @@ use actix::*; use actix::msgs::StopArbiter; use helpers; -use channel::{HttpChannel, HttpHandler, Node}; +use server::HttpHandler; +use server::channel::HttpChannel; +use server::settings::WorkerSettings; #[derive(Message)] @@ -43,60 +44,6 @@ pub(crate) struct StopWorker { pub graceful: Option, } -pub(crate) struct WorkerSettings { - h: RefCell>, - enabled: bool, - keep_alive: u64, - bytes: Rc, - messages: Rc, - channels: Cell, - node: Node<()>, -} - -impl WorkerSettings { - pub(crate) fn new(h: Vec, keep_alive: Option) -> WorkerSettings { - WorkerSettings { - h: RefCell::new(h), - enabled: if let Some(ka) = keep_alive { ka > 0 } else { false }, - keep_alive: keep_alive.unwrap_or(0), - bytes: Rc::new(helpers::SharedBytesPool::new()), - messages: Rc::new(helpers::SharedMessagePool::new()), - channels: Cell::new(0), - node: Node::head(), - } - } - - pub fn head(&self) -> &Node<()> { - &self.node - } - pub fn handlers(&self) -> RefMut> { - self.h.borrow_mut() - } - pub fn keep_alive(&self) -> u64 { - self.keep_alive - } - pub fn keep_alive_enabled(&self) -> bool { - self.enabled - } - pub fn get_shared_bytes(&self) -> helpers::SharedBytes { - helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) - } - pub fn get_http_message(&self) -> helpers::SharedHttpMessage { - helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages)) - } - pub fn add_channel(&self) { - self.channels.set(self.channels.get()+1); - } - pub fn remove_channel(&self) { - let num = self.channels.get(); - if num > 0 { - self.channels.set(num-1); - } else { - error!("Number of removed channels is bigger than added channel. Bug in actix-web"); - } - } -} - /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests processing. @@ -127,7 +74,7 @@ impl Worker { tx: oneshot::Sender, dur: time::Duration) { // sleep for 1 second and then check again ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { - let num = slf.settings.channels.get(); + let num = slf.settings.num_channels(); if num == 0 { let _ = tx.send(true); Arbiter::arbiter().send(StopArbiter(0)); @@ -174,7 +121,7 @@ impl Handler for Worker type Result = Response; fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { - let num = self.settings.channels.get(); + let num = self.settings.num_channels(); if num == 0 { info!("Shutting down http worker, 0 connections"); Self::reply(Ok(true)) diff --git a/src/test.rs b/src/test.rs index 22b09b29e..5616ae554 100644 --- a/src/test.rs +++ b/src/test.rs @@ -16,9 +16,7 @@ use tokio_core::reactor::Core; use net2::TcpBuilder; use error::Error; -use server::{HttpServer, ServerSettings}; use handler::{Handler, Responder, ReplyItem}; -use channel::{HttpHandler, IntoHttpHandler}; use middleware::Middleware; use application::{Application, HttpApplication}; use param::Params; @@ -26,6 +24,7 @@ use router::Router; use payload::Payload; use httprequest::HttpRequest; use httpresponse::HttpResponse; +use server::{HttpServer, HttpHandler, IntoHttpHandler, ServerSettings}; /// The `TestServer` type. /// diff --git a/tests/test_server.rs b/tests/test_server.rs index b88b25a43..bb6a6baef 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -72,12 +72,12 @@ fn test_start() { assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); // pause - let _ = srv_addr.call_fut(dev::PauseServer).wait(); + let _ = srv_addr.call_fut(server::PauseServer).wait(); thread::sleep(time::Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_err()); // resume - let _ = srv_addr.call_fut(dev::ResumeServer).wait(); + let _ = srv_addr.call_fut(server::ResumeServer).wait(); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); }