1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-26 19:47:43 +02:00

move and update server+tls examples (#190)

This commit is contained in:
Rob Ede
2020-09-13 10:12:07 +01:00
committed by GitHub
parent 681eeb497d
commit fb0aa02b3c
14 changed files with 241 additions and 217 deletions

View File

@ -41,3 +41,4 @@ mio-uds = { version = "0.6.7" }
bytes = "0.5"
env_logger = "0.7"
actix-testing = "1.0.0"
tokio = { version = "0.2", features = ["io-util"] }

View File

@ -0,0 +1,88 @@
//! Simple composite-service TCP echo server.
//!
//! Using the following command:
//!
//! ```sh
//! nc 127.0.0.1 8080
//! ```
//!
//! Start typing. When you press enter the typed line will be echoed back. The server will log
//! the length of each line it echos and the total size of data sent when the connection is closed.
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::{env, io};
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::pipeline_factory;
use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[actix_rt::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "actix=trace,basic=trace");
env_logger::init();
let count = Arc::new(AtomicUsize::new(0));
let addr = ("127.0.0.1", 8080);
info!("starting server on port: {}", &addr.0);
// Bind socket address and start worker(s). By default, the server uses the number of available
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs
// to return a service *factory*; so it can be created once per worker.
Server::build()
.bind("echo", addr, move || {
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);
pipeline_factory(move |mut stream: TcpStream| {
let count = Arc::clone(&count);
async move {
let num = count.fetch_add(1, Ordering::SeqCst);
let num = num + 1;
let mut size = 0;
let mut buf = BytesMut::new();
loop {
match stream.read_buf(&mut buf).await {
// end of stream; bail from loop
Ok(0) => break,
// more bytes to process
Ok(bytes_read) => {
info!("[{}] read {} bytes", num, bytes_read);
stream.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
}
// stream error; bail from loop with error
Err(err) => {
error!("Stream Error: {:?}", err);
return Err(());
}
}
}
// send data down service pipeline
Ok((buf.freeze(), size))
}
})
.map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| {
let num = num2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
ok(size)
})
})?
.workers(1)
.run()
.await
}

View File

@ -1,6 +1,6 @@
//! General purpose tcp server
#![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity)]
//! General purpose TCP server.
#![deny(rust_2018_idioms)]
mod accept;
mod builder;
@ -19,7 +19,7 @@ pub use self::service::ServiceFactory;
#[doc(hidden)]
pub use self::socket::FromStream;
/// Socket id token
/// Socket ID token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Token(usize);

View File

@ -17,8 +17,10 @@ use crate::socket::{FromStream, StdStream};
pub(crate) enum ServerMessage {
/// New stream
Connect(StdStream),
/// Gracefull shutdown
/// Gracefully shutdown
Shutdown(Duration),
/// Force shutdown
ForceShutdown,
}

View File

@ -303,6 +303,7 @@ enum WorkerState {
Restarting(
usize,
Token,
#[allow(clippy::type_complexity)]
Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
),
Shutdown(