diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 6933d28e..997487c2 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -156,7 +156,7 @@ impl Framed { } impl Framed { - /// Serialize item and Write to the inner buffer + /// Serialize item and write to the inner buffer pub fn write(mut self: Pin<&mut Self>, item: I) -> Result<(), >::Error> where T: AsyncWrite, diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index ddd72641..2edfdc85 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -43,4 +43,4 @@ actix-rt = "2.4.0" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] } -tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } +tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] } diff --git a/actix-server/examples/file-reader.rs b/actix-server/examples/file-reader.rs new file mode 100644 index 00000000..3cc991d3 --- /dev/null +++ b/actix-server/examples/file-reader.rs @@ -0,0 +1,93 @@ +//! Simple file-reader TCP server with framed stream. +//! +//! Using the following command: +//! +//! ```sh +//! nc 127.0.0.1 8080 +//! ``` +//! +//! Follow the prompt and enter a file path, relative or absolute. + +use std::io; + +use actix_codec::{Framed, LinesCodec}; +use actix_rt::net::TcpStream; +use actix_server::Server; +use actix_service::{fn_service, ServiceFactoryExt as _}; +use futures_util::{SinkExt as _, StreamExt as _}; +use tokio::{fs::File, io::AsyncReadExt as _}; + +async fn run() -> io::Result<()> { + env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); + + let addr = ("127.0.0.1", 8080); + log::info!("starting server on port: {}", &addr.0); + + // Bind socket address and start worker(s). By default, the server uses the number of physical + // CPU cores as the worker count. For this reason, the closure passed to bind needs to return + // a service *factory*; so it can be created once per worker. + Server::build() + .bind("file-reader", addr, move || { + fn_service(move |stream: TcpStream| async move { + // set up codec to use with I/O resource + let mut framed = Framed::new(stream, LinesCodec::default()); + + loop { + // prompt for file name + framed.send("Type file name to return:").await?; + + // wait for next line + match framed.next().await { + Some(Ok(line)) => { + match File::open(line).await { + Ok(mut file) => { + // read file into String buffer + let mut buf = String::new(); + file.read_to_string(&mut buf).await?; + + // send String into framed object + framed.send(buf).await?; + + // break out of loop and + break; + } + Err(err) => { + log::error!("{}", err); + framed + .send("File not found or not readable. Try again.") + .await?; + continue; + } + }; + } + + // not being able to read a line from the stream is unrecoverable + Some(Err(err)) => return Err(err), + + // This EOF won't be hit. + None => continue, + } + } + + // close connection after file has been copied to TCP stream + Ok(()) + }) + .map_err(|err| log::error!("Service Error: {:?}", err)) + })? + .workers(2) + .run() + .await +} + +#[tokio::main] +async fn main() -> io::Result<()> { + run().await?; + Ok(()) +} + +// alternatively: +// #[actix_rt::main] +// async fn main() -> io::Result<()> { +// run().await?; +// Ok(()) +// } diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 548a3591..da0b7053 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -26,7 +26,7 @@ use log::{error, info}; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; async fn run() -> io::Result<()> { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); let count = Arc::new(AtomicUsize::new(0)); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index d2aa08e1..7a6123ef 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -6,7 +6,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::{ server::ServerCommand, - service::{InternalServiceFactory, ServiceFactory, StreamNewService}, + service::{InternalServiceFactory, ServerServiceFactory, StreamNewService}, socket::{ create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs, }, @@ -142,7 +142,7 @@ impl ServerBuilder { /// Add new service to the server. pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where - F: ServiceFactory, + F: ServerServiceFactory, U: ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; @@ -172,7 +172,7 @@ impl ServerBuilder { factory: F, ) -> io::Result where - F: ServiceFactory, + F: ServerServiceFactory, { lst.set_nonblocking(true)?; let addr = lst.local_addr()?; @@ -213,7 +213,7 @@ impl ServerBuilder { /// Add new unix domain service to the server. pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result where - F: ServiceFactory, + F: ServerServiceFactory, N: AsRef, U: AsRef, { @@ -240,7 +240,7 @@ impl ServerBuilder { factory: F, ) -> io::Result where - F: ServiceFactory, + F: ServerServiceFactory, { use std::net::{IpAddr, Ipv4Addr}; lst.set_nonblocking(true)?; diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index f9b1a992..9bbb33a0 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -21,7 +21,7 @@ mod worker; pub use self::builder::ServerBuilder; pub use self::handle::ServerHandle; pub use self::server::Server; -pub use self::service::ServiceFactory; +pub use self::service::ServerServiceFactory; pub use self::test_server::TestServer; #[doc(hidden)] diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 28ffb4f1..24d2450c 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -1,16 +1,21 @@ -use std::marker::PhantomData; -use std::net::SocketAddr; -use std::task::{Context, Poll}; +use std::{ + marker::PhantomData, + net::SocketAddr, + task::{Context, Poll}, +}; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; -use crate::socket::{FromStream, MioStream}; -use crate::worker::WorkerCounterGuard; +use crate::{ + socket::{FromStream, MioStream}, + worker::WorkerCounterGuard, +}; -pub trait ServiceFactory: Send + Clone + 'static { +#[doc(hidden)] +pub trait ServerServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; fn create(&self) -> Self::Factory; @@ -80,7 +85,7 @@ where } } -pub(crate) struct StreamNewService, Io: FromStream> { +pub(crate) struct StreamNewService, Io: FromStream> { name: String, inner: F, token: usize, @@ -90,7 +95,7 @@ pub(crate) struct StreamNewService, Io: FromStream> { impl StreamNewService where - F: ServiceFactory, + F: ServerServiceFactory, Io: FromStream + Send + 'static, { pub(crate) fn create( @@ -111,7 +116,7 @@ where impl InternalServiceFactory for StreamNewService where - F: ServiceFactory, + F: ServerServiceFactory, Io: FromStream + Send + 'static, { fn name(&self, _: usize) -> &str { @@ -143,7 +148,7 @@ where } } -impl ServiceFactory for F +impl ServerServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T: BaseServiceFactory, diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 7f281701..25291fd2 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -1,19 +1,18 @@ pub(crate) use std::net::{ SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, }; +use std::{fmt, io}; +use actix_rt::net::TcpStream; pub(crate) use mio::net::TcpListener as MioTcpListener; +use mio::{event::Source, Interest, Registry, Token}; + #[cfg(unix)] pub(crate) use { mio::net::UnixListener as MioUnixListener, std::os::unix::net::UnixListener as StdUnixListener, }; -use std::{fmt, io}; - -use actix_rt::net::TcpStream; -use mio::{event::Source, Interest, Registry, Token}; - pub(crate) enum MioListener { Tcp(MioTcpListener), #[cfg(unix)] diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index fc3bcbe3..16820f3e 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -2,7 +2,7 @@ use std::{io, net, sync::mpsc, thread}; use actix_rt::{net::TcpStream, System}; -use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory}; +use crate::{Server, ServerBuilder, ServerHandle, ServerServiceFactory}; /// A testing server. /// @@ -66,7 +66,7 @@ impl TestServer { } /// Start new test server with application factory. - pub fn with>(factory: F) -> TestServerRuntime { + pub fn with>(factory: F) -> TestServerRuntime { let (tx, rx) = mpsc::channel(); // run server in separate thread diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index e175e325..5b988847 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -26,25 +26,88 @@ fn test_bind() { let srv = Server::build() .workers(1) .disable_signals() + .shutdown_timeout(3600) .bind("test", addr, move || { fn_service(|_| async { Ok::<_, ()>(()) }) })? .run(); - let _ = tx.send(srv.handle()); - + tx.send(srv.handle()).unwrap(); srv.await }) }); + let srv = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); - assert!(net::TcpStream::connect(addr).is_ok()); + + net::TcpStream::connect(addr).unwrap(); let _ = srv.stop(true); h.join().unwrap().unwrap(); } +#[test] +fn test_listen() { + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + let lst = net::TcpListener::bind(addr).unwrap(); + + let h = thread::spawn(move || { + actix_rt::System::new().block_on(async { + let srv = Server::build() + .workers(1) + .disable_signals() + .shutdown_timeout(3600) + .listen("test", lst, move || { + fn_service(|_| async { Ok::<_, ()>(()) }) + })? + .run(); + + tx.send(srv.handle()).unwrap(); + srv.await + }) + }); + + let srv = rx.recv().unwrap(); + + thread::sleep(Duration::from_millis(500)); + + net::TcpStream::connect(addr).unwrap(); + + let _ = srv.stop(true); + h.join().unwrap().unwrap(); +} + +// #[test] +// fn test_bind() { +// let addr = unused_addr(); +// let (tx, rx) = mpsc::channel(); + +// let h = thread::spawn(move || { +// actix_rt::System::new().block_on(async { +// let srv = Server::build() +// .workers(1) +// .disable_signals() +// .bind("test", addr, move || { +// fn_service(|_| async { Ok::<_, ()>(()) }) +// })? +// .run(); + +// let _ = tx.send(srv.handle()); + +// srv.await +// }) +// }); +// let srv = rx.recv().unwrap(); + +// thread::sleep(Duration::from_millis(500)); +// assert!(net::TcpStream::connect(addr).is_ok()); + +// let _ = srv.stop(true); +// h.join().unwrap().unwrap(); +// } + #[test] fn plain_tokio_runtime() { let addr = unused_addr(); @@ -80,37 +143,37 @@ fn plain_tokio_runtime() { h.join().unwrap().unwrap(); } -#[test] -fn test_listen() { - let addr = unused_addr(); - let lst = net::TcpListener::bind(addr).unwrap(); +// #[test] +// fn test_listen() { +// let addr = unused_addr(); +// let lst = net::TcpListener::bind(addr).unwrap(); - let (tx, rx) = mpsc::channel(); +// let (tx, rx) = mpsc::channel(); - let h = thread::spawn(move || { - actix_rt::System::new().block_on(async { - let srv = Server::build() - .disable_signals() - .workers(1) - .listen("test", lst, move || { - fn_service(|_| async { Ok::<_, ()>(()) }) - })? - .run(); +// let h = thread::spawn(move || { +// actix_rt::System::new().block_on(async { +// let srv = Server::build() +// .disable_signals() +// .workers(1) +// .listen("test", lst, move || { +// fn_service(|_| async { Ok::<_, ()>(()) }) +// })? +// .run(); - let _ = tx.send(srv.handle()); +// let _ = tx.send(srv.handle()); - srv.await - }) - }); +// srv.await +// }) +// }); - let srv = rx.recv().unwrap(); +// let srv = rx.recv().unwrap(); - thread::sleep(Duration::from_millis(500)); - assert!(net::TcpStream::connect(addr).is_ok()); +// thread::sleep(Duration::from_millis(500)); +// assert!(net::TcpStream::connect(addr).is_ok()); - let _ = srv.stop(true); - h.join().unwrap().unwrap(); -} +// let _ = srv.stop(true); +// h.join().unwrap().unwrap(); +// } #[test] #[cfg(unix)]