mirror of
https://github.com/fafhrd91/actix-net
synced 2025-01-31 09:12:08 +01:00
add actix stream trait (#276)
This commit is contained in:
parent
2cfe1d88ad
commit
75d7ae3139
@ -77,6 +77,44 @@ pub mod net {
|
|||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
|
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
|
||||||
|
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
|
/// Trait for generic over tokio stream types and various wrapper types around them.
|
||||||
|
pub trait ActixStream: AsyncRead + AsyncWrite + Unpin + 'static {
|
||||||
|
/// poll stream and check read readiness of Self.
|
||||||
|
///
|
||||||
|
/// See [tokio::net::TcpStream::poll_read_ready] for detail
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>>;
|
||||||
|
|
||||||
|
/// poll stream and check write readiness of Self.
|
||||||
|
///
|
||||||
|
/// See [tokio::net::TcpStream::poll_write_ready] for detail
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ActixStream for TcpStream {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
TcpStream::poll_read_ready(self, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
TcpStream::poll_write_ready(self, cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
impl ActixStream for UnixStream {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
UnixStream::poll_read_ready(self, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
UnixStream::poll_write_ready(self, cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod time {
|
pub mod time {
|
||||||
|
@ -29,9 +29,10 @@ use std::{
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use actix_rt::net::TcpStream;
|
||||||
use actix_server::Server;
|
use actix_server::Server;
|
||||||
use actix_service::pipeline_factory;
|
use actix_service::pipeline_factory;
|
||||||
use actix_tls::accept::rustls::Acceptor as RustlsAcceptor;
|
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
|
||||||
use futures_util::future::ok;
|
use futures_util::future::ok;
|
||||||
use log::info;
|
use log::info;
|
||||||
use rustls::{
|
use rustls::{
|
||||||
@ -74,9 +75,9 @@ async fn main() -> io::Result<()> {
|
|||||||
// Set up TLS service factory
|
// Set up TLS service factory
|
||||||
pipeline_factory(tls_acceptor.clone())
|
pipeline_factory(tls_acceptor.clone())
|
||||||
.map_err(|err| println!("Rustls error: {:?}", err))
|
.map_err(|err| println!("Rustls error: {:?}", err))
|
||||||
.and_then(move |stream| {
|
.and_then(move |stream: TlsStream<TcpStream>| {
|
||||||
let num = count.fetch_add(1, Ordering::Relaxed);
|
let num = count.fetch_add(1, Ordering::Relaxed);
|
||||||
info!("[{}] Got TLS connection: {:?}", num, stream);
|
info!("[{}] Got TLS connection: {:?}", num, &*stream);
|
||||||
ok(())
|
ok(())
|
||||||
})
|
})
|
||||||
})?
|
})?
|
||||||
|
@ -1,15 +1,94 @@
|
|||||||
use std::task::{Context, Poll};
|
use std::{
|
||||||
|
io::{self, IoSlice},
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
pin::Pin,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
use actix_rt::net::ActixStream;
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use actix_utils::counter::Counter;
|
use actix_utils::counter::Counter;
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
|
||||||
pub use tokio_native_tls::native_tls::Error;
|
pub use tokio_native_tls::native_tls::Error;
|
||||||
pub use tokio_native_tls::{TlsAcceptor, TlsStream};
|
pub use tokio_native_tls::TlsAcceptor;
|
||||||
|
|
||||||
use super::MAX_CONN_COUNTER;
|
use super::MAX_CONN_COUNTER;
|
||||||
|
|
||||||
|
/// wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait.
|
||||||
|
pub struct TlsStream<T>(tokio_native_tls::TlsStream<T>);
|
||||||
|
|
||||||
|
impl<T> From<tokio_native_tls::TlsStream<T>> for TlsStream<T> {
|
||||||
|
fn from(stream: tokio_native_tls::TlsStream<T>) -> Self {
|
||||||
|
Self(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> Deref for TlsStream<T> {
|
||||||
|
type Target = tokio_native_tls::TlsStream<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> DerefMut for TlsStream<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncRead for TlsStream<T> {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_flush(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
(&**self).is_write_vectored()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> ActixStream for TlsStream<T> {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_read_ready((&**self).get_ref().get_ref().get_ref(), cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_write_ready((&**self).get_ref().get_ref().get_ref(), cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Accept TLS connections via `native-tls` package.
|
/// Accept TLS connections via `native-tls` package.
|
||||||
///
|
///
|
||||||
/// `native-tls` feature enables this `Acceptor` type.
|
/// `native-tls` feature enables this `Acceptor` type.
|
||||||
@ -34,10 +113,7 @@ impl Clone for Acceptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ServiceFactory<T> for Acceptor
|
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
|
||||||
{
|
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Config = ();
|
type Config = ();
|
||||||
@ -71,10 +147,7 @@ impl Clone for NativeTlsAcceptorService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Service<T> for NativeTlsAcceptorService
|
impl<T: ActixStream> Service<T> for NativeTlsAcceptorService {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
|
||||||
{
|
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
|
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
|
||||||
@ -93,7 +166,7 @@ where
|
|||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let io = this.acceptor.accept(io).await;
|
let io = this.acceptor.accept(io).await;
|
||||||
drop(guard);
|
drop(guard);
|
||||||
io
|
io.map(Into::into)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
|
io::{self, IoSlice},
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
use actix_rt::net::ActixStream;
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use actix_utils::counter::{Counter, CounterGuard};
|
use actix_utils::counter::{Counter, CounterGuard};
|
||||||
use futures_core::{future::LocalBoxFuture, ready};
|
use futures_core::{future::LocalBoxFuture, ready};
|
||||||
@ -12,10 +15,82 @@ use futures_core::{future::LocalBoxFuture, ready};
|
|||||||
pub use openssl::ssl::{
|
pub use openssl::ssl::{
|
||||||
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
|
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
|
||||||
};
|
};
|
||||||
pub use tokio_openssl::SslStream;
|
|
||||||
|
|
||||||
use super::MAX_CONN_COUNTER;
|
use super::MAX_CONN_COUNTER;
|
||||||
|
|
||||||
|
/// wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
||||||
|
pub struct SslStream<T>(tokio_openssl::SslStream<T>);
|
||||||
|
|
||||||
|
impl<T> From<tokio_openssl::SslStream<T>> for SslStream<T> {
|
||||||
|
fn from(stream: tokio_openssl::SslStream<T>) -> Self {
|
||||||
|
Self(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for SslStream<T> {
|
||||||
|
type Target = tokio_openssl::SslStream<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for SslStream<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncRead for SslStream<T> {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncWrite for SslStream<T> {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_flush(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
(&**self).is_write_vectored()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> ActixStream for SslStream<T> {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_read_ready((&**self).get_ref(), cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_write_ready((&**self).get_ref(), cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Accept TLS connections via `openssl` package.
|
/// Accept TLS connections via `openssl` package.
|
||||||
///
|
///
|
||||||
/// `openssl` feature enables this `Acceptor` type.
|
/// `openssl` feature enables this `Acceptor` type.
|
||||||
@ -40,10 +115,7 @@ impl Clone for Acceptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ServiceFactory<T> for Acceptor
|
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
|
||||||
{
|
|
||||||
type Response = SslStream<T>;
|
type Response = SslStream<T>;
|
||||||
type Error = SslError;
|
type Error = SslError;
|
||||||
type Config = ();
|
type Config = ();
|
||||||
@ -67,10 +139,7 @@ pub struct AcceptorService {
|
|||||||
conns: Counter,
|
conns: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Service<T> for AcceptorService
|
impl<T: ActixStream> Service<T> for AcceptorService {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
|
||||||
{
|
|
||||||
type Response = SslStream<T>;
|
type Response = SslStream<T>;
|
||||||
type Error = SslError;
|
type Error = SslError;
|
||||||
type Future = AcceptorServiceResponse<T>;
|
type Future = AcceptorServiceResponse<T>;
|
||||||
@ -88,24 +157,25 @@ where
|
|||||||
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
|
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
|
||||||
AcceptorServiceResponse {
|
AcceptorServiceResponse {
|
||||||
_guard: self.conns.get(),
|
_guard: self.conns.get(),
|
||||||
stream: Some(SslStream::new(ssl, io).unwrap()),
|
stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct AcceptorServiceResponse<T>
|
pub struct AcceptorServiceResponse<T: ActixStream> {
|
||||||
where
|
stream: Option<tokio_openssl::SslStream<T>>,
|
||||||
T: AsyncRead + AsyncWrite,
|
|
||||||
{
|
|
||||||
stream: Option<SslStream<T>>,
|
|
||||||
_guard: CounterGuard,
|
_guard: CounterGuard,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
|
impl<T: ActixStream> Future for AcceptorServiceResponse<T> {
|
||||||
type Output = Result<SslStream<T>, SslError>;
|
type Output = Result<SslStream<T>, SslError>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
|
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
|
||||||
Poll::Ready(Ok(self.stream.take().expect("SSL connect has resolved.")))
|
Poll::Ready(Ok(self
|
||||||
|
.stream
|
||||||
|
.take()
|
||||||
|
.expect("SSL connect has resolved.")
|
||||||
|
.into()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,22 +1,96 @@
|
|||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
io,
|
io::{self, IoSlice},
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
use actix_rt::net::ActixStream;
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use actix_utils::counter::{Counter, CounterGuard};
|
use actix_utils::counter::{Counter, CounterGuard};
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
use tokio_rustls::{Accept, TlsAcceptor};
|
use tokio_rustls::{Accept, TlsAcceptor};
|
||||||
|
|
||||||
pub use tokio_rustls::rustls::{ServerConfig, Session};
|
pub use tokio_rustls::rustls::{ServerConfig, Session};
|
||||||
pub use tokio_rustls::server::TlsStream;
|
|
||||||
|
|
||||||
use super::MAX_CONN_COUNTER;
|
use super::MAX_CONN_COUNTER;
|
||||||
|
|
||||||
|
/// wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
||||||
|
pub struct TlsStream<T>(tokio_rustls::server::TlsStream<T>);
|
||||||
|
|
||||||
|
impl<T> From<tokio_rustls::server::TlsStream<T>> for TlsStream<T> {
|
||||||
|
fn from(stream: tokio_rustls::server::TlsStream<T>) -> Self {
|
||||||
|
Self(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for TlsStream<T> {
|
||||||
|
type Target = tokio_rustls::server::TlsStream<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for TlsStream<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncRead for TlsStream<T> {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_flush(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
(&**self).is_write_vectored()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> ActixStream for TlsStream<T> {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_read_ready((&**self).get_ref().0, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_write_ready((&**self).get_ref().0, cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Accept TLS connections via `rustls` package.
|
/// Accept TLS connections via `rustls` package.
|
||||||
///
|
///
|
||||||
/// `rustls` feature enables this `Acceptor` type.
|
/// `rustls` feature enables this `Acceptor` type.
|
||||||
@ -43,10 +117,7 @@ impl Clone for Acceptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ServiceFactory<T> for Acceptor
|
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
{
|
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
type Config = ();
|
type Config = ();
|
||||||
@ -72,10 +143,7 @@ pub struct AcceptorService {
|
|||||||
conns: Counter,
|
conns: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Service<T> for AcceptorService
|
impl<T: ActixStream> Service<T> for AcceptorService {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
{
|
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
type Future = AcceptorServiceFut<T>;
|
type Future = AcceptorServiceFut<T>;
|
||||||
@ -96,22 +164,16 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct AcceptorServiceFut<T>
|
pub struct AcceptorServiceFut<T: ActixStream> {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
{
|
|
||||||
fut: Accept<T>,
|
fut: Accept<T>,
|
||||||
_guard: CounterGuard,
|
_guard: CounterGuard,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Future for AcceptorServiceFut<T>
|
impl<T: ActixStream> Future for AcceptorServiceFut<T> {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
{
|
|
||||||
type Output = Result<TlsStream<T>, io::Error>;
|
type Output = Result<TlsStream<T>, io::Error>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.get_mut();
|
let this = self.get_mut();
|
||||||
Pin::new(&mut this.fut).poll(cx)
|
Pin::new(&mut this.fut).poll(cx).map_ok(TlsStream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user