1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-23 21:51:06 +01:00

add file reader example

This commit is contained in:
Rob Ede 2021-12-26 22:32:35 +00:00
parent 89a4c2ee27
commit 9935883905
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
10 changed files with 213 additions and 53 deletions

View File

@ -156,7 +156,7 @@ impl<T, U> Framed<T, U> {
} }
impl<T, U> Framed<T, U> { impl<T, U> Framed<T, U> {
/// Serialize item and Write to the inner buffer /// Serialize item and write to the inner buffer
pub fn write<I>(mut self: Pin<&mut Self>, item: I) -> Result<(), <U as Encoder<I>>::Error> pub fn write<I>(mut self: Pin<&mut Self>, item: I) -> Result<(), <U as Encoder<I>>::Error>
where where
T: AsyncWrite, T: AsyncWrite,

View File

@ -43,4 +43,4 @@ actix-rt = "2.4.0"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] }

View File

@ -0,0 +1,93 @@
//! Simple file-reader TCP server with framed stream.
//!
//! Using the following command:
//!
//! ```sh
//! nc 127.0.0.1 8080
//! ```
//!
//! Follow the prompt and enter a file path, relative or absolute.
use std::io;
use actix_codec::{Framed, LinesCodec};
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _};
use futures_util::{SinkExt as _, StreamExt as _};
use tokio::{fs::File, io::AsyncReadExt as _};
async fn run() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
let addr = ("127.0.0.1", 8080);
log::info!("starting server on port: {}", &addr.0);
// Bind socket address and start worker(s). By default, the server uses the number of physical
// CPU cores as the worker count. For this reason, the closure passed to bind needs to return
// a service *factory*; so it can be created once per worker.
Server::build()
.bind("file-reader", addr, move || {
fn_service(move |stream: TcpStream| async move {
// set up codec to use with I/O resource
let mut framed = Framed::new(stream, LinesCodec::default());
loop {
// prompt for file name
framed.send("Type file name to return:").await?;
// wait for next line
match framed.next().await {
Some(Ok(line)) => {
match File::open(line).await {
Ok(mut file) => {
// read file into String buffer
let mut buf = String::new();
file.read_to_string(&mut buf).await?;
// send String into framed object
framed.send(buf).await?;
// break out of loop and
break;
}
Err(err) => {
log::error!("{}", err);
framed
.send("File not found or not readable. Try again.")
.await?;
continue;
}
};
}
// not being able to read a line from the stream is unrecoverable
Some(Err(err)) => return Err(err),
// This EOF won't be hit.
None => continue,
}
}
// close connection after file has been copied to TCP stream
Ok(())
})
.map_err(|err| log::error!("Service Error: {:?}", err))
})?
.workers(2)
.run()
.await
}
#[tokio::main]
async fn main() -> io::Result<()> {
run().await?;
Ok(())
}
// alternatively:
// #[actix_rt::main]
// async fn main() -> io::Result<()> {
// run().await?;
// Ok(())
// }

View File

@ -26,7 +26,7 @@ use log::{error, info};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
async fn run() -> io::Result<()> { async fn run() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
let count = Arc::new(AtomicUsize::new(0)); let count = Arc::new(AtomicUsize::new(0));

View File

@ -6,7 +6,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::{ use crate::{
server::ServerCommand, server::ServerCommand,
service::{InternalServiceFactory, ServiceFactory, StreamNewService}, service::{InternalServiceFactory, ServerServiceFactory, StreamNewService},
socket::{ socket::{
create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs, create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs,
}, },
@ -142,7 +142,7 @@ impl ServerBuilder {
/// Add new service to the server. /// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServiceFactory<TcpStream>, F: ServerServiceFactory<TcpStream>,
U: ToSocketAddrs, U: ToSocketAddrs,
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
@ -172,7 +172,7 @@ impl ServerBuilder {
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory<TcpStream>, F: ServerServiceFactory<TcpStream>,
{ {
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
let addr = lst.local_addr()?; let addr = lst.local_addr()?;
@ -213,7 +213,7 @@ impl ServerBuilder {
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServiceFactory<actix_rt::net::UnixStream>, F: ServerServiceFactory<actix_rt::net::UnixStream>,
N: AsRef<str>, N: AsRef<str>,
U: AsRef<std::path::Path>, U: AsRef<std::path::Path>,
{ {
@ -240,7 +240,7 @@ impl ServerBuilder {
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory<actix_rt::net::UnixStream>, F: ServerServiceFactory<actix_rt::net::UnixStream>,
{ {
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;

View File

@ -21,7 +21,7 @@ mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::handle::ServerHandle; pub use self::handle::ServerHandle;
pub use self::server::Server; pub use self::server::Server;
pub use self::service::ServiceFactory; pub use self::service::ServerServiceFactory;
pub use self::test_server::TestServer; pub use self::test_server::TestServer;
#[doc(hidden)] #[doc(hidden)]

View File

@ -1,16 +1,21 @@
use std::marker::PhantomData; use std::{
use std::net::SocketAddr; marker::PhantomData,
use std::task::{Context, Poll}; net::SocketAddr,
task::{Context, Poll},
};
use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_utils::future::{ready, Ready}; use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::error; use log::error;
use crate::socket::{FromStream, MioStream}; use crate::{
use crate::worker::WorkerCounterGuard; socket::{FromStream, MioStream},
worker::WorkerCounterGuard,
};
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { #[doc(hidden)]
pub trait ServerServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: BaseServiceFactory<Stream, Config = ()>; type Factory: BaseServiceFactory<Stream, Config = ()>;
fn create(&self) -> Self::Factory; fn create(&self) -> Self::Factory;
@ -80,7 +85,7 @@ where
} }
} }
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> { pub(crate) struct StreamNewService<F: ServerServiceFactory<Io>, Io: FromStream> {
name: String, name: String,
inner: F, inner: F,
token: usize, token: usize,
@ -90,7 +95,7 @@ pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
impl<F, Io> StreamNewService<F, Io> impl<F, Io> StreamNewService<F, Io>
where where
F: ServiceFactory<Io>, F: ServerServiceFactory<Io>,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
pub(crate) fn create( pub(crate) fn create(
@ -111,7 +116,7 @@ where
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io> impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
where where
F: ServiceFactory<Io>, F: ServerServiceFactory<Io>,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
fn name(&self, _: usize) -> &str { fn name(&self, _: usize) -> &str {
@ -143,7 +148,7 @@ where
} }
} }
impl<F, T, I> ServiceFactory<I> for F impl<F, T, I> ServerServiceFactory<I> for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn() -> T + Send + Clone + 'static,
T: BaseServiceFactory<I, Config = ()>, T: BaseServiceFactory<I, Config = ()>,

View File

@ -1,19 +1,18 @@
pub(crate) use std::net::{ pub(crate) use std::net::{
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
}; };
use std::{fmt, io};
use actix_rt::net::TcpStream;
pub(crate) use mio::net::TcpListener as MioTcpListener; pub(crate) use mio::net::TcpListener as MioTcpListener;
use mio::{event::Source, Interest, Registry, Token};
#[cfg(unix)] #[cfg(unix)]
pub(crate) use { pub(crate) use {
mio::net::UnixListener as MioUnixListener, mio::net::UnixListener as MioUnixListener,
std::os::unix::net::UnixListener as StdUnixListener, std::os::unix::net::UnixListener as StdUnixListener,
}; };
use std::{fmt, io};
use actix_rt::net::TcpStream;
use mio::{event::Source, Interest, Registry, Token};
pub(crate) enum MioListener { pub(crate) enum MioListener {
Tcp(MioTcpListener), Tcp(MioTcpListener),
#[cfg(unix)] #[cfg(unix)]

View File

@ -2,7 +2,7 @@ use std::{io, net, sync::mpsc, thread};
use actix_rt::{net::TcpStream, System}; use actix_rt::{net::TcpStream, System};
use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory}; use crate::{Server, ServerBuilder, ServerHandle, ServerServiceFactory};
/// A testing server. /// A testing server.
/// ///
@ -66,7 +66,7 @@ impl TestServer {
} }
/// Start new test server with application factory. /// Start new test server with application factory.
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime { pub fn with<F: ServerServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
// run server in separate thread // run server in separate thread

View File

@ -26,25 +26,88 @@ fn test_bind() {
let srv = Server::build() let srv = Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.shutdown_timeout(3600)
.bind("test", addr, move || { .bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) }) fn_service(|_| async { Ok::<_, ()>(()) })
})? })?
.run(); .run();
let _ = tx.send(srv.handle()); tx.send(srv.handle()).unwrap();
srv.await srv.await
}) })
}); });
let srv = rx.recv().unwrap(); let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
net::TcpStream::connect(addr).unwrap();
let _ = srv.stop(true); let _ = srv.stop(true);
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
#[test]
fn test_listen() {
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let lst = net::TcpListener::bind(addr).unwrap();
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
.shutdown_timeout(3600)
.listen("test", lst, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
tx.send(srv.handle()).unwrap();
srv.await
})
});
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
net::TcpStream::connect(addr).unwrap();
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
// #[test]
// fn test_bind() {
// let addr = unused_addr();
// let (tx, rx) = mpsc::channel();
// let h = thread::spawn(move || {
// actix_rt::System::new().block_on(async {
// let srv = Server::build()
// .workers(1)
// .disable_signals()
// .bind("test", addr, move || {
// fn_service(|_| async { Ok::<_, ()>(()) })
// })?
// .run();
// let _ = tx.send(srv.handle());
// srv.await
// })
// });
// let srv = rx.recv().unwrap();
// thread::sleep(Duration::from_millis(500));
// assert!(net::TcpStream::connect(addr).is_ok());
// let _ = srv.stop(true);
// h.join().unwrap().unwrap();
// }
#[test] #[test]
fn plain_tokio_runtime() { fn plain_tokio_runtime() {
let addr = unused_addr(); let addr = unused_addr();
@ -80,37 +143,37 @@ fn plain_tokio_runtime() {
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
#[test] // #[test]
fn test_listen() { // fn test_listen() {
let addr = unused_addr(); // let addr = unused_addr();
let lst = net::TcpListener::bind(addr).unwrap(); // let lst = net::TcpListener::bind(addr).unwrap();
let (tx, rx) = mpsc::channel(); // let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { // let h = thread::spawn(move || {
actix_rt::System::new().block_on(async { // actix_rt::System::new().block_on(async {
let srv = Server::build() // let srv = Server::build()
.disable_signals() // .disable_signals()
.workers(1) // .workers(1)
.listen("test", lst, move || { // .listen("test", lst, move || {
fn_service(|_| async { Ok::<_, ()>(()) }) // fn_service(|_| async { Ok::<_, ()>(()) })
})? // })?
.run(); // .run();
let _ = tx.send(srv.handle()); // let _ = tx.send(srv.handle());
srv.await // srv.await
}) // })
}); // });
let srv = rx.recv().unwrap(); // let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500)); // thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); // assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true); // let _ = srv.stop(true);
h.join().unwrap().unwrap(); // h.join().unwrap().unwrap();
} // }
#[test] #[test]
#[cfg(unix)] #[cfg(unix)]