mirror of
https://github.com/fafhrd91/actix-net
synced 2024-11-23 22:51:07 +01:00
normalize logs capital letter (#463)
Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
parent
283974f3e6
commit
126ed4c2e3
@ -74,7 +74,7 @@ async fn run() -> io::Result<()> {
|
|||||||
// close connection after file has been copied to TCP stream
|
// close connection after file has been copied to TCP stream
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.map_err(|err| tracing::error!("Service Error: {:?}", err))
|
.map_err(|err| tracing::error!("service error: {:?}", err))
|
||||||
})?
|
})?
|
||||||
.workers(2)
|
.workers(2)
|
||||||
.run()
|
.run()
|
||||||
|
@ -64,7 +64,7 @@ async fn run() -> io::Result<()> {
|
|||||||
|
|
||||||
// stream error; bail from loop with error
|
// stream error; bail from loop with error
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
tracing::error!("Stream Error: {:?}", err);
|
tracing::error!("stream error: {:?}", err);
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -74,7 +74,7 @@ async fn run() -> io::Result<()> {
|
|||||||
Ok((buf.freeze(), size))
|
Ok((buf.freeze(), size))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.map_err(|err| tracing::error!("Service Error: {:?}", err))
|
.map_err(|err| tracing::error!("service error: {:?}", err))
|
||||||
.and_then(move |(_, size)| {
|
.and_then(move |(_, size)| {
|
||||||
let num = num2.load(Ordering::SeqCst);
|
let num = num2.load(Ordering::SeqCst);
|
||||||
tracing::info!("[{}] total bytes read: {}", num, size);
|
tracing::info!("[{}] total bytes read: {}", num, size);
|
||||||
|
@ -140,7 +140,7 @@ impl Accept {
|
|||||||
WAKER_TOKEN => {
|
WAKER_TOKEN => {
|
||||||
let exit = self.handle_waker(sockets);
|
let exit = self.handle_waker(sockets);
|
||||||
if exit {
|
if exit {
|
||||||
info!("Accept thread stopped");
|
info!("accept thread stopped");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -297,16 +297,16 @@ impl Accept {
|
|||||||
|
|
||||||
fn register_logged(&self, info: &mut ServerSocketInfo) {
|
fn register_logged(&self, info: &mut ServerSocketInfo) {
|
||||||
match self.register(info) {
|
match self.register(info) {
|
||||||
Ok(_) => debug!("Resume accepting connections on {}", info.lst.local_addr()),
|
Ok(_) => debug!("resume accepting connections on {}", info.lst.local_addr()),
|
||||||
Err(err) => error!("Can not register server socket {}", err),
|
Err(err) => error!("can not register server socket {}", err),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
|
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
|
||||||
match self.poll.registry().deregister(&mut info.lst) {
|
match self.poll.registry().deregister(&mut info.lst) {
|
||||||
Ok(_) => debug!("Paused accepting connections on {}", info.lst.local_addr()),
|
Ok(_) => debug!("paused accepting connections on {}", info.lst.local_addr()),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Can not deregister server socket {}", err)
|
error!("can not deregister server socket {}", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -350,7 +350,7 @@ impl Accept {
|
|||||||
self.remove_next();
|
self.remove_next();
|
||||||
|
|
||||||
if self.handles.is_empty() {
|
if self.handles.is_empty() {
|
||||||
error!("No workers");
|
error!("no workers");
|
||||||
// All workers are gone and Conn is nowhere to be sent.
|
// All workers are gone and Conn is nowhere to be sent.
|
||||||
// Treat this situation as Ok and drop Conn.
|
// Treat this situation as Ok and drop Conn.
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@ -399,7 +399,7 @@ impl Accept {
|
|||||||
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return,
|
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return,
|
||||||
Err(ref err) if connection_error(err) => continue,
|
Err(ref err) if connection_error(err) => continue,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Error accepting connection: {}", err);
|
error!("error accepting connection: {}", err);
|
||||||
|
|
||||||
// deregister listener temporary
|
// deregister listener temporary
|
||||||
self.deregister_logged(info);
|
self.deregister_logged(info);
|
||||||
|
@ -197,7 +197,7 @@ impl ServerBuilder {
|
|||||||
if self.sockets.is_empty() {
|
if self.sockets.is_empty() {
|
||||||
panic!("Server should have at least one bound socket");
|
panic!("Server should have at least one bound socket");
|
||||||
} else {
|
} else {
|
||||||
info!("Starting {} workers", self.threads);
|
info!("starting {} workers", self.threads);
|
||||||
Server::new(self)
|
Server::new(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -200,7 +200,7 @@ impl ServerInner {
|
|||||||
|
|
||||||
for (_, name, lst) in &builder.sockets {
|
for (_, name, lst) in &builder.sockets {
|
||||||
info!(
|
info!(
|
||||||
r#"Starting service: "{}", workers: {}, listening on: {}"#,
|
r#"starting service: "{}", workers: {}, listening on: {}"#,
|
||||||
name,
|
name,
|
||||||
builder.threads,
|
builder.threads,
|
||||||
lst.local_addr()
|
lst.local_addr()
|
||||||
@ -283,7 +283,7 @@ impl ServerInner {
|
|||||||
// TODO: maybe just return with warning log if not found ?
|
// TODO: maybe just return with warning log if not found ?
|
||||||
assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
|
assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
|
||||||
|
|
||||||
error!("Worker {} has died; restarting", idx);
|
error!("worker {} has died; restarting", idx);
|
||||||
|
|
||||||
let factories = self
|
let factories = self
|
||||||
.services
|
.services
|
||||||
|
@ -78,7 +78,7 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Can not convert to an async tcp stream: {}", err);
|
error!("can not convert to an async TCP stream: {}", err);
|
||||||
Err(())
|
Err(())
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -70,7 +70,7 @@ impl Signals {
|
|||||||
.map(|tokio_sig| (*sig, tokio_sig))
|
.map(|tokio_sig| (*sig, tokio_sig))
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
"Can not initialize stream handler for {:?} err: {}",
|
"can not initialize stream handler for {:?} err: {}",
|
||||||
sig,
|
sig,
|
||||||
e
|
e
|
||||||
)
|
)
|
||||||
|
@ -337,7 +337,7 @@ impl ServerWorker {
|
|||||||
Ok((token, svc)) => services.push((idx, token, svc)),
|
Ok((token, svc)) => services.push((idx, token, svc)),
|
||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Can not start worker: {:?}", err);
|
error!("can not start worker: {:?}", err);
|
||||||
return Err(io::Error::new(
|
return Err(io::Error::new(
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
format!("can not start server service {}", idx),
|
format!("can not start server service {}", idx),
|
||||||
@ -436,7 +436,7 @@ impl ServerWorker {
|
|||||||
Ok((token, svc)) => services.push((idx, token, svc)),
|
Ok((token, svc)) => services.push((idx, token, svc)),
|
||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Can not start worker: {:?}", err);
|
error!("can not start worker: {:?}", err);
|
||||||
Arbiter::current().stop();
|
Arbiter::current().stop();
|
||||||
factory_tx
|
factory_tx
|
||||||
.send(Err(io::Error::new(
|
.send(Err(io::Error::new(
|
||||||
@ -476,7 +476,7 @@ impl ServerWorker {
|
|||||||
|
|
||||||
fn restart_service(&mut self, idx: usize, factory_id: usize) {
|
fn restart_service(&mut self, idx: usize, factory_id: usize) {
|
||||||
let factory = &self.factories[factory_id];
|
let factory = &self.factories[factory_id];
|
||||||
trace!("Service {:?} failed, restarting", factory.name(idx));
|
trace!("service {:?} failed, restarting", factory.name(idx));
|
||||||
self.services[idx].status = WorkerServiceStatus::Restarting;
|
self.services[idx].status = WorkerServiceStatus::Restarting;
|
||||||
self.state = WorkerState::Restarting(Restart {
|
self.state = WorkerState::Restarting(Restart {
|
||||||
factory_id,
|
factory_id,
|
||||||
@ -508,7 +508,7 @@ impl ServerWorker {
|
|||||||
Poll::Ready(Ok(_)) => {
|
Poll::Ready(Ok(_)) => {
|
||||||
if srv.status == WorkerServiceStatus::Unavailable {
|
if srv.status == WorkerServiceStatus::Unavailable {
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} is available",
|
"service {:?} is available",
|
||||||
self.factories[srv.factory_idx].name(idx)
|
self.factories[srv.factory_idx].name(idx)
|
||||||
);
|
);
|
||||||
srv.status = WorkerServiceStatus::Available;
|
srv.status = WorkerServiceStatus::Available;
|
||||||
@ -519,7 +519,7 @@ impl ServerWorker {
|
|||||||
|
|
||||||
if srv.status == WorkerServiceStatus::Available {
|
if srv.status == WorkerServiceStatus::Available {
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} is unavailable",
|
"service {:?} is unavailable",
|
||||||
self.factories[srv.factory_idx].name(idx)
|
self.factories[srv.factory_idx].name(idx)
|
||||||
);
|
);
|
||||||
srv.status = WorkerServiceStatus::Unavailable;
|
srv.status = WorkerServiceStatus::Unavailable;
|
||||||
@ -527,7 +527,7 @@ impl ServerWorker {
|
|||||||
}
|
}
|
||||||
Poll::Ready(Err(_)) => {
|
Poll::Ready(Err(_)) => {
|
||||||
error!(
|
error!(
|
||||||
"Service {:?} readiness check returned error, restarting",
|
"service {:?} readiness check returned error, restarting",
|
||||||
self.factories[srv.factory_idx].name(idx)
|
self.factories[srv.factory_idx].name(idx)
|
||||||
);
|
);
|
||||||
srv.status = WorkerServiceStatus::Failed;
|
srv.status = WorkerServiceStatus::Failed;
|
||||||
@ -590,11 +590,11 @@ impl Future for ServerWorker {
|
|||||||
{
|
{
|
||||||
let num = this.counter.total();
|
let num = this.counter.total();
|
||||||
if num == 0 {
|
if num == 0 {
|
||||||
info!("Shutting down idle worker");
|
info!("shutting down idle worker");
|
||||||
let _ = tx.send(true);
|
let _ = tx.send(true);
|
||||||
return Poll::Ready(());
|
return Poll::Ready(());
|
||||||
} else if graceful {
|
} else if graceful {
|
||||||
info!("Graceful worker shutdown; finishing {} connections", num);
|
info!("graceful worker shutdown; finishing {} connections", num);
|
||||||
this.shutdown(false);
|
this.shutdown(false);
|
||||||
|
|
||||||
this.state = WorkerState::Shutdown(Shutdown {
|
this.state = WorkerState::Shutdown(Shutdown {
|
||||||
@ -603,7 +603,7 @@ impl Future for ServerWorker {
|
|||||||
tx,
|
tx,
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
info!("Force shutdown worker, closing {} connections", num);
|
info!("force shutdown worker, closing {} connections", num);
|
||||||
this.shutdown(true);
|
this.shutdown(true);
|
||||||
|
|
||||||
let _ = tx.send(false);
|
let _ = tx.send(false);
|
||||||
@ -638,7 +638,7 @@ impl Future for ServerWorker {
|
|||||||
assert_eq!(token, token_new);
|
assert_eq!(token, token_new);
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} has been restarted",
|
"service {:?} has been restarted",
|
||||||
this.factories[factory_id].name(token)
|
this.factories[factory_id].name(token)
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -685,7 +685,7 @@ impl Future for ServerWorker {
|
|||||||
match this.check_readiness(cx) {
|
match this.check_readiness(cx) {
|
||||||
Ok(true) => {}
|
Ok(true) => {}
|
||||||
Ok(false) => {
|
Ok(false) => {
|
||||||
trace!("Worker is unavailable");
|
trace!("worker is unavailable");
|
||||||
this.state = WorkerState::Unavailable;
|
this.state = WorkerState::Unavailable;
|
||||||
return self.poll(cx);
|
return self.poll(cx);
|
||||||
}
|
}
|
||||||
|
@ -186,9 +186,9 @@ fn test_start() {
|
|||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_max_concurrent_connections() {
|
async fn test_max_concurrent_connections() {
|
||||||
// Note:
|
// Note:
|
||||||
// A tcp listener would accept connects based on it's backlog setting.
|
// A TCP listener would accept connects based on it's backlog setting.
|
||||||
//
|
//
|
||||||
// The limit test on the other hand is only for concurrent tcp stream limiting a work
|
// The limit test on the other hand is only for concurrent TCP stream limiting a work
|
||||||
// thread accept.
|
// thread accept.
|
||||||
|
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
@ -74,16 +74,16 @@ where
|
|||||||
let connector = self.connector.clone();
|
let connector = self.connector.clone();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
trace!("SSL Handshake start for: {:?}", stream.hostname());
|
trace!("TLS handshake start for: {:?}", stream.hostname());
|
||||||
connector
|
connector
|
||||||
.connect(stream.hostname(), io)
|
.connect(stream.hostname(), io)
|
||||||
.await
|
.await
|
||||||
.map(|res| {
|
.map(|res| {
|
||||||
trace!("SSL Handshake success: {:?}", stream.hostname());
|
trace!("TLS handshake success: {:?}", stream.hostname());
|
||||||
stream.replace_io(res).1
|
stream.replace_io(res).1
|
||||||
})
|
})
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
trace!("SSL Handshake error: {:?}", e);
|
trace!("TLS handshake error: {:?}", e);
|
||||||
io::Error::new(io::ErrorKind::Other, format!("{}", e))
|
io::Error::new(io::ErrorKind::Other, format!("{}", e))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -97,7 +97,8 @@ where
|
|||||||
actix_service::always_ready!();
|
actix_service::always_ready!();
|
||||||
|
|
||||||
fn call(&self, stream: Connection<R, IO>) -> Self::Future {
|
fn call(&self, stream: Connection<R, IO>) -> Self::Future {
|
||||||
trace!("SSL Handshake start for: {:?}", stream.hostname());
|
trace!("TLS handshake start for: {:?}", stream.hostname());
|
||||||
|
|
||||||
let (io, stream) = stream.replace_io(());
|
let (io, stream) = stream.replace_io(());
|
||||||
let host = stream.hostname();
|
let host = stream.hostname();
|
||||||
|
|
||||||
@ -137,11 +138,11 @@ where
|
|||||||
match ready!(Pin::new(this.io.as_mut().unwrap()).poll_connect(cx)) {
|
match ready!(Pin::new(this.io.as_mut().unwrap()).poll_connect(cx)) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
let stream = this.stream.take().unwrap();
|
let stream = this.stream.take().unwrap();
|
||||||
trace!("SSL Handshake success: {:?}", stream.hostname());
|
trace!("TLS handshake success: {:?}", stream.hostname());
|
||||||
Poll::Ready(Ok(stream.replace_io(this.io.take().unwrap()).1))
|
Poll::Ready(Ok(stream.replace_io(this.io.take().unwrap()).1))
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
trace!("SSL Handshake error: {:?}", err);
|
trace!("TLS handshake error: {:?}", err);
|
||||||
Poll::Ready(Err(io::Error::new(
|
Poll::Ready(Err(io::Error::new(
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
format!("{}", err),
|
format!("{}", err),
|
||||||
|
@ -101,7 +101,7 @@ where
|
|||||||
actix_service::always_ready!();
|
actix_service::always_ready!();
|
||||||
|
|
||||||
fn call(&self, connection: Connection<R, IO>) -> Self::Future {
|
fn call(&self, connection: Connection<R, IO>) -> Self::Future {
|
||||||
trace!("SSL Handshake start for: {:?}", connection.hostname());
|
trace!("TLS handshake start for: {:?}", connection.hostname());
|
||||||
let (stream, connection) = connection.replace_io(());
|
let (stream, connection) = connection.replace_io(());
|
||||||
|
|
||||||
match ServerName::try_from(connection.hostname()) {
|
match ServerName::try_from(connection.hostname()) {
|
||||||
@ -140,7 +140,7 @@ where
|
|||||||
Self::Future { connect, connection } => {
|
Self::Future { connect, connection } => {
|
||||||
let stream = ready!(Pin::new(connect).poll(cx))?;
|
let stream = ready!(Pin::new(connect).poll(cx))?;
|
||||||
let connection = connection.take().unwrap();
|
let connection = connection.take().unwrap();
|
||||||
trace!("SSL Handshake success: {:?}", connection.hostname());
|
trace!("TLS handshake success: {:?}", connection.hostname());
|
||||||
Poll::Ready(Ok(connection.replace_io(stream).1))
|
Poll::Ready(Ok(connection.replace_io(stream).1))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,8 +114,8 @@ impl<R: Host> TcpConnectorFut<R> {
|
|||||||
stream: ReusableBoxFuture::new(connect(addr, local_addr)),
|
stream: ReusableBoxFuture::new(connect(addr, local_addr)),
|
||||||
},
|
},
|
||||||
|
|
||||||
// when resolver returns multiple socket addr for request they would be popped from
|
// When resolver returns multiple socket addr for request they would be popped from
|
||||||
// front end of queue and returns with the first successful tcp connection.
|
// front end of queue and returns with the first successful TCP connection.
|
||||||
ConnectAddrs::Multi(mut addrs) => {
|
ConnectAddrs::Multi(mut addrs) => {
|
||||||
let addr = addrs.pop_front().unwrap();
|
let addr = addrs.pop_front().unwrap();
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user