From 0f1c80ccc63840b9da646b268f2e07dd520c6837 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Sep 2018 08:45:49 -0700 Subject: [PATCH] deprecate start_incoming --- src/client/connector.rs | 4 +- src/server/channel.rs | 5 ++ src/server/http.rs | 115 +++++++++++++++++++--------------------- 3 files changed, 63 insertions(+), 61 deletions(-) diff --git a/src/client/connector.rs b/src/client/connector.rs index 6e82e3fd8..8d71913fe 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -51,7 +51,7 @@ type SslConnector = Arc; feature = "ssl", feature = "tls", feature = "rust-tls", -)))] +),))] type SslConnector = (); use server::IoStream; @@ -290,7 +290,7 @@ impl Default for ClientConnector { feature = "ssl", feature = "tls", feature = "rust-tls", - )))] + ),))] { () } diff --git a/src/server/channel.rs b/src/server/channel.rs index c1e6b6b24..0d92c23a3 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -1,6 +1,7 @@ use std::net::{Shutdown, SocketAddr}; use std::{io, ptr, time}; +use actix::Message; use bytes::{Buf, BufMut, BytesMut}; use futures::{Async, Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -282,6 +283,10 @@ where io: T, } +impl Message for WrapperStream { + type Result = (); +} + impl WrapperStream where T: AsyncRead + AsyncWrite + 'static, diff --git a/src/server/http.rs b/src/server/http.rs index 22537cb86..81c4d3ad6 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -1,11 +1,13 @@ use std::{io, mem, net}; -use actix::{Addr, System}; +use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System}; use actix_net::server::Server; use actix_net::ssl; +use futures::Stream; use net2::TcpBuilder; use num_cpus; +use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "tls")] use native_tls::TlsAcceptor; @@ -17,8 +19,10 @@ use openssl::ssl::SslAcceptorBuilder; use rustls::ServerConfig; use super::acceptor::{AcceptorServiceFactory, DefaultAcceptor}; -use super::builder::DefaultPipelineFactory; -use super::builder::{HttpServiceBuilder, ServiceProvider}; +use super::builder::{DefaultPipelineFactory, HttpServiceBuilder, ServiceProvider}; +use super::channel::{HttpChannel, WrapperStream}; +use super::handler::HttpHandler; +use super::settings::{ServerSettings, WorkerSettings}; use super::{IntoHttpHandler, KeepAlive}; struct Socket { @@ -520,67 +524,60 @@ impl H + Send + Clone> HttpServer { } } -// impl HttpServer { -// /// Start listening for incoming connections from a stream. -// /// -// /// This method uses only one thread for handling incoming connections. -// pub fn start_incoming(self, stream: S, secure: bool) -// where -// S: Stream + Send + 'static, -// T: AsyncRead + AsyncWrite + Send + 'static, -// { -// // set server settings -// let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); -// let srv_settings = ServerSettings::new(Some(addr), &self.host, secure); -// let apps: Vec<_> = (*self.factory)() -// .into_iter() -// .map(|h| h.into_handler()) -// .collect(); -// let settings = WorkerSettings::create( -// apps, -// self.keep_alive, -// srv_settings, -// ); +impl HttpServer +where + H: IntoHttpHandler, + F: Fn() -> H + Send + Clone, +{ + #[doc(hidden)] + #[deprecated(since = "0.7.8")] + /// Start listening for incoming connections from a stream. + /// + /// This method uses only one thread for handling incoming connections. + pub fn start_incoming(self, stream: S, secure: bool) + where + S: Stream + 'static, + T: AsyncRead + AsyncWrite + 'static, + { + // set server settings + let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let apps = (self.factory)().into_handler(); + let settings = WorkerSettings::new( + apps, + self.keep_alive, + self.client_timeout as u64, + ServerSettings::new(Some(addr), &self.host, secure), + ); -// // start server -// HttpIncoming::create(move |ctx| { -// ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn { -// io: WrapperStream::new(t), -// handler: Token::new(0), -// token: Token::new(0), -// peer: None, -// })); -// HttpIncoming { settings } -// }); -// } -// } + // start server + HttpIncoming::create(move |ctx| { + ctx.add_message_stream( + stream.map_err(|_| ()).map(move |t| WrapperStream::new(t)), + ); + HttpIncoming { settings } + }); + } +} -// struct HttpIncoming { -// settings: Rc>, -// } +struct HttpIncoming { + settings: WorkerSettings, +} -// impl Actor for HttpIncoming -// where -// H: HttpHandler, -// { -// type Context = Context; -// } +impl Actor for HttpIncoming { + type Context = Context; +} -// impl Handler> for HttpIncoming -// where -// T: IoStream, -// H: HttpHandler, -// { -// type Result = (); +impl Handler> for HttpIncoming +where + T: AsyncRead + AsyncWrite, + H: HttpHandler, +{ + type Result = (); -// fn handle(&mut self, msg: Conn, _: &mut Context) -> Self::Result { -// spawn(HttpChannel::new( -// Rc::clone(&self.settings), -// msg.io, -// msg.peer, -// )); -// } -// } + fn handle(&mut self, msg: WrapperStream, _: &mut Context) -> Self::Result { + Arbiter::spawn(HttpChannel::new(self.settings.clone(), msg, None)); + } +} fn create_tcp_listener( addr: net::SocketAddr, backlog: i32,