1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

refactor http service builder

This commit is contained in:
Nikolay Kim 2018-09-26 20:43:54 -07:00
parent 0aa0f326f7
commit 9f1417af30
10 changed files with 435 additions and 355 deletions

View File

@ -61,6 +61,7 @@ flate2-rust = ["flate2/rust_backend"]
[dependencies]
actix = "0.7.0"
actix-net = { git="https://github.com/actix/actix-net.git" }
#actix-net = { path = "../actix-net" }
base64 = "0.9"
bitflags = "1.0"

View File

@ -1127,12 +1127,23 @@ mod tests {
let resp: HttpResponse = HttpResponse::Ok().into();
let resp = cors.response(&req, resp).unwrap().response();
let origins_str = resp.headers().get(header::ACCESS_CONTROL_ALLOW_ORIGIN).unwrap().to_str().unwrap();
let origins_str = resp
.headers()
.get(header::ACCESS_CONTROL_ALLOW_ORIGIN)
.unwrap()
.to_str()
.unwrap();
if origins_str.starts_with("https://www.example.com") {
assert_eq!("https://www.example.com, https://www.google.com", origins_str);
assert_eq!(
"https://www.example.com, https://www.google.com",
origins_str
);
} else {
assert_eq!("https://www.google.com, https://www.example.com", origins_str);
assert_eq!(
"https://www.google.com, https://www.example.com",
origins_str
);
}
}

View File

@ -1,8 +1,8 @@
//! Payload stream
use bytes::{Bytes, BytesMut};
use futures::task::Task;
#[cfg(not(test))]
use futures::task::current as current_task;
use futures::task::Task;
use futures::{Async, Poll, Stream};
use std::cell::RefCell;
use std::cmp;

257
src/server/builder.rs Normal file
View File

@ -0,0 +1,257 @@
use std::marker::PhantomData;
use std::net;
use actix_net::server;
use actix_net::service::{NewService, NewServiceExt, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Poll};
use tokio_tcp::TcpStream;
use super::handler::IntoHttpHandler;
use super::service::HttpService;
use super::{IoStream, KeepAlive};
pub(crate) trait ServiceFactory<H>
where
H: IntoHttpHandler,
{
fn register(&self, server: server::Server, lst: net::TcpListener) -> server::Server;
}
pub struct HttpServiceBuilder<F, H, A, P>
where
F: Fn() -> H + Send + Clone,
{
factory: F,
acceptor: A,
pipeline: P,
}
impl<F, H, A, P> HttpServiceBuilder<F, H, A, P>
where
F: Fn() -> H + Send + Clone,
H: IntoHttpHandler,
A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>,
{
pub fn new(factory: F, acceptor: A, pipeline: P) -> Self {
Self {
factory,
pipeline,
acceptor,
}
}
pub fn acceptor<A1>(self, acceptor: A1) -> HttpServiceBuilder<F, H, A1, P>
where
A1: AcceptorServiceFactory,
{
HttpServiceBuilder {
acceptor,
pipeline: self.pipeline,
factory: self.factory.clone(),
}
}
pub fn pipeline<P1>(self, pipeline: P1) -> HttpServiceBuilder<F, H, A, P1>
where
P1: HttpPipelineFactory,
{
HttpServiceBuilder {
pipeline,
acceptor: self.acceptor,
factory: self.factory.clone(),
}
}
fn finish(&self) -> impl server::StreamServiceFactory {
let pipeline = self.pipeline.clone();
let acceptor = self.acceptor.clone();
move || acceptor.create().and_then(pipeline.create())
}
}
impl<F, H, A, P> Clone for HttpServiceBuilder<F, H, A, P>
where
F: Fn() -> H + Send + Clone,
A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>,
{
fn clone(&self) -> Self {
HttpServiceBuilder {
factory: self.factory.clone(),
acceptor: self.acceptor.clone(),
pipeline: self.pipeline.clone(),
}
}
}
impl<F, H, A, P> ServiceFactory<H> for HttpServiceBuilder<F, H, A, P>
where
F: Fn() -> H + Send + Clone,
A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>,
H: IntoHttpHandler,
{
fn register(&self, server: server::Server, lst: net::TcpListener) -> server::Server {
server.listen("actix-web", lst, self.finish())
}
}
pub trait AcceptorServiceFactory: Send + Clone + 'static {
type Io: IoStream + Send;
type NewService: NewService<
Request = TcpStream,
Response = Self::Io,
Error = (),
InitError = (),
>;
fn create(&self) -> Self::NewService;
}
impl<F, T> AcceptorServiceFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T::Response: IoStream + Send,
T: NewService<Request = TcpStream, Error = (), InitError = ()>,
{
type Io = T::Response;
type NewService = T;
fn create(&self) -> T {
(self)()
}
}
pub trait HttpPipelineFactory: Send + Clone + 'static {
type Io: IoStream;
type NewService: NewService<
Request = Self::Io,
Response = (),
Error = (),
InitError = (),
>;
fn create(&self) -> Self::NewService;
}
impl<F, T> HttpPipelineFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService<Response = (), Error = (), InitError = ()>,
T::Request: IoStream,
{
type Io = T::Request;
type NewService = T;
fn create(&self) -> T {
(self)()
}
}
pub(crate) struct DefaultPipelineFactory<F, H, Io>
where
F: Fn() -> H + Send + Clone,
{
factory: F,
host: Option<String>,
addr: net::SocketAddr,
keep_alive: KeepAlive,
_t: PhantomData<Io>,
}
impl<F, H, Io> DefaultPipelineFactory<F, H, Io>
where
Io: IoStream + Send,
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
pub fn new(
factory: F, host: Option<String>, addr: net::SocketAddr, keep_alive: KeepAlive,
) -> Self {
Self {
factory,
addr,
keep_alive,
host,
_t: PhantomData,
}
}
}
impl<F, H, Io> Clone for DefaultPipelineFactory<F, H, Io>
where
Io: IoStream,
F: Fn() -> H + Send + Clone,
H: IntoHttpHandler,
{
fn clone(&self) -> Self {
Self {
factory: self.factory.clone(),
addr: self.addr,
keep_alive: self.keep_alive,
host: self.host.clone(),
_t: PhantomData,
}
}
}
impl<F, H, Io> HttpPipelineFactory for DefaultPipelineFactory<F, H, Io>
where
Io: IoStream + Send,
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
type Io = Io;
type NewService = HttpService<F, H, Io>;
fn create(&self) -> Self::NewService {
HttpService::new(
self.factory.clone(),
self.addr,
self.host.clone(),
self.keep_alive,
)
}
}
#[derive(Clone)]
pub(crate) struct DefaultAcceptor;
impl AcceptorServiceFactory for DefaultAcceptor {
type Io = TcpStream;
type NewService = DefaultAcceptor;
fn create(&self) -> Self::NewService {
DefaultAcceptor
}
}
impl NewService for DefaultAcceptor {
type Request = TcpStream;
type Response = TcpStream;
type Error = ();
type InitError = ();
type Service = DefaultAcceptor;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
ok(DefaultAcceptor)
}
}
impl Service for DefaultAcceptor {
type Request = TcpStream;
type Response = TcpStream;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
ok(req)
}
}

View File

@ -89,8 +89,8 @@ where
H: HttpHandler + 'static,
{
pub fn new(
settings: WorkerSettings<H>, stream: T, addr: Option<SocketAddr>,
buf: BytesMut, is_eof: bool, keepalive_timer: Option<Delay>,
settings: WorkerSettings<H>, stream: T, addr: Option<SocketAddr>, buf: BytesMut,
is_eof: bool, keepalive_timer: Option<Delay>,
) -> Self {
Http1 {
flags: if is_eof {
@ -379,10 +379,7 @@ where
fn push_response_entry(&mut self, status: StatusCode) {
self.tasks.push_back(Entry {
pipe: EntryPipe::Error(ServerError::err(
Version::HTTP_11,
status,
)),
pipe: EntryPipe::Error(ServerError::err(Version::HTTP_11, status)),
flags: EntryFlags::empty(),
});
}

View File

@ -1,13 +1,10 @@
use std::marker::PhantomData;
use std::{io, mem, net, time};
use std::{io, mem, net};
use actix::{Actor, Addr, AsyncContext, Context, Handler, System};
use actix_net::server::{Server, ServerServiceFactory};
use actix_net::service::{NewService, NewServiceExt, Service};
use actix::{Addr, System};
use actix_net::server;
use actix_net::service::NewService;
use actix_net::ssl;
use futures::future::{ok, FutureResult};
use futures::{Async, Poll, Stream};
use net2::TcpBuilder;
use num_cpus;
use tokio_tcp::TcpStream;
@ -21,9 +18,9 @@ use openssl::ssl::SslAcceptorBuilder;
//#[cfg(feature = "rust-tls")]
//use rustls::ServerConfig;
use super::channel::HttpChannel;
use super::settings::{ServerSettings, WorkerSettings};
use super::{HttpHandler, IntoHttpHandler, IoStream, KeepAlive};
use super::builder::{AcceptorServiceFactory, HttpServiceBuilder, ServiceFactory};
use super::builder::{DefaultAcceptor, DefaultPipelineFactory};
use super::{IntoHttpHandler, IoStream, KeepAlive};
struct Socket<H: IntoHttpHandler> {
scheme: &'static str,
@ -205,17 +202,16 @@ where
lst,
addr,
scheme: "http",
handler: Box::new(SimpleFactory {
addr,
factory: self.factory.clone(),
pipeline: DefaultPipelineFactory {
handler: Box::new(HttpServiceBuilder::new(
self.factory.clone(),
DefaultAcceptor,
DefaultPipelineFactory::new(
self.factory.clone(),
self.host.clone(),
addr,
factory: self.factory.clone(),
host: self.host.clone(),
keep_alive: self.keep_alive,
_t: PhantomData,
},
}),
self.keep_alive,
),
)),
});
self
@ -239,6 +235,7 @@ where
addr,
scheme: "https",
handler: Box::new(HttpServiceBuilder::new(
self.factory.clone(),
acceptor,
DefaultPipelineFactory::new(
self.factory.clone(),
@ -346,6 +343,7 @@ where
addr,
scheme: "https",
handler: Box::new(HttpServiceBuilder::new(
self.factory.clone(),
acceptor.clone(),
DefaultPipelineFactory::new(
self.factory.clone(),
@ -493,10 +491,10 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
/// sys.run(); // <- Run actix system, this method starts all async processes
/// }
/// ```
pub fn start(mut self) -> Addr<Server> {
pub fn start(mut self) -> Addr<server::Server> {
ssl::max_concurrent_ssl_connect(self.maxconnrate);
let mut srv = Server::new()
let mut srv = server::Server::new()
.workers(self.threads)
.maxconn(self.maxconn)
.shutdown_timeout(self.shutdown_timeout);
@ -605,143 +603,6 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
// }
// }
struct HttpService<F, H, Io>
where
F: Fn() -> H,
H: IntoHttpHandler,
Io: IoStream,
{
factory: F,
addr: net::SocketAddr,
host: Option<String>,
keep_alive: KeepAlive,
_t: PhantomData<Io>,
}
impl<F, H, Io> NewService for HttpService<F, H, Io>
where
F: Fn() -> H,
H: IntoHttpHandler,
Io: IoStream,
{
type Request = Io;
type Response = ();
type Error = ();
type InitError = ();
type Service = HttpServiceHandler<H::Handler, Io>;
type Future = FutureResult<Self::Service, Self::Error>;
fn new_service(&self) -> Self::Future {
let s = ServerSettings::new(Some(self.addr), &self.host, false);
let app = (self.factory)().into_handler();
ok(HttpServiceHandler::new(app, self.keep_alive, s))
}
}
struct HttpServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
settings: WorkerSettings<H>,
tcp_ka: Option<time::Duration>,
_t: PhantomData<Io>,
}
impl<H, Io> HttpServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
fn new(
app: H, keep_alive: KeepAlive, settings: ServerSettings,
) -> HttpServiceHandler<H, Io> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
} else {
None
};
let settings = WorkerSettings::new(app, keep_alive, settings);
HttpServiceHandler {
tcp_ka,
settings,
_t: PhantomData,
}
}
}
impl<H, Io> Service for HttpServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
type Request = Io;
type Response = ();
type Error = ();
type Future = HttpChannel<Io, H>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, mut req: Self::Request) -> Self::Future {
let _ = req.set_nodelay(true);
HttpChannel::new(self.settings.clone(), req, None)
}
// fn shutdown(&self, force: bool) {
// if force {
// self.settings.head().traverse::<TcpStream, H>();
// }
// }
}
trait ServiceFactory<H>
where
H: IntoHttpHandler,
{
fn register(&self, server: Server, lst: net::TcpListener) -> Server;
}
struct SimpleFactory<H, F, P>
where
H: IntoHttpHandler,
F: Fn() -> H + Send + Clone,
P: HttpPipelineFactory<Io = TcpStream>,
{
pub addr: net::SocketAddr,
pub factory: F,
pub pipeline: P,
}
impl<H: IntoHttpHandler, F, P> Clone for SimpleFactory<H, F, P>
where
P: HttpPipelineFactory<Io = TcpStream>,
F: Fn() -> H + Send + Clone,
{
fn clone(&self) -> Self {
SimpleFactory {
addr: self.addr,
factory: self.factory.clone(),
pipeline: self.pipeline.clone(),
}
}
}
impl<H, F, P> ServiceFactory<H> for SimpleFactory<H, F, P>
where
H: IntoHttpHandler + 'static,
F: Fn() -> H + Send + Clone + 'static,
P: HttpPipelineFactory<Io = TcpStream>,
{
fn register(&self, server: Server, lst: net::TcpListener) -> Server {
let pipeline = self.pipeline.clone();
server.listen(lst, move || pipeline.create())
}
}
fn create_tcp_listener(
addr: net::SocketAddr, backlog: i32,
) -> io::Result<net::TcpListener> {
@ -753,183 +614,3 @@ fn create_tcp_listener(
builder.bind(addr)?;
Ok(builder.listen(backlog)?)
}
pub struct HttpServiceBuilder<H, A, P> {
acceptor: A,
pipeline: P,
t: PhantomData<H>,
}
impl<H, A, P> HttpServiceBuilder<H, A, P>
where
A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>,
H: IntoHttpHandler,
{
pub fn new(acceptor: A, pipeline: P) -> Self {
Self {
acceptor,
pipeline,
t: PhantomData,
}
}
pub fn acceptor<A1>(self, acceptor: A1) -> HttpServiceBuilder<H, A1, P>
where
A1: AcceptorServiceFactory,
{
HttpServiceBuilder {
acceptor,
pipeline: self.pipeline,
t: PhantomData,
}
}
pub fn pipeline<P1>(self, pipeline: P1) -> HttpServiceBuilder<H, A, P1>
where
P1: HttpPipelineFactory,
{
HttpServiceBuilder {
pipeline,
acceptor: self.acceptor,
t: PhantomData,
}
}
fn finish(&self) -> impl ServerServiceFactory {
let acceptor = self.acceptor.clone();
let pipeline = self.pipeline.clone();
move || acceptor.create().and_then(pipeline.create())
}
}
impl<H, A, P> ServiceFactory<H> for HttpServiceBuilder<H, A, P>
where
A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>,
H: IntoHttpHandler,
{
fn register(&self, server: Server, lst: net::TcpListener) -> Server {
server.listen(lst, self.finish())
}
}
pub trait AcceptorServiceFactory: Send + Clone + 'static {
type Io: IoStream + Send;
type NewService: NewService<
Request = TcpStream,
Response = Self::Io,
Error = (),
InitError = (),
>;
fn create(&self) -> Self::NewService;
}
impl<F, T> AcceptorServiceFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T::Response: IoStream + Send,
T: NewService<Request = TcpStream, Error = (), InitError = ()>,
{
type Io = T::Response;
type NewService = T;
fn create(&self) -> T {
(self)()
}
}
pub trait HttpPipelineFactory: Send + Clone + 'static {
type Io: IoStream;
type NewService: NewService<
Request = Self::Io,
Response = (),
Error = (),
InitError = (),
>;
fn create(&self) -> Self::NewService;
}
impl<F, T> HttpPipelineFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService<Response = (), Error = (), InitError = ()>,
T::Request: IoStream,
{
type Io = T::Request;
type NewService = T;
fn create(&self) -> T {
(self)()
}
}
struct DefaultPipelineFactory<F, H, Io>
where
F: Fn() -> H + Send + Clone,
{
factory: F,
host: Option<String>,
addr: net::SocketAddr,
keep_alive: KeepAlive,
_t: PhantomData<Io>,
}
impl<F, H, Io> DefaultPipelineFactory<F, H, Io>
where
Io: IoStream + Send,
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
fn new(
factory: F, host: Option<String>, addr: net::SocketAddr, keep_alive: KeepAlive,
) -> Self {
Self {
factory,
addr,
keep_alive,
host,
_t: PhantomData,
}
}
}
impl<F, H, Io> Clone for DefaultPipelineFactory<F, H, Io>
where
Io: IoStream,
F: Fn() -> H + Send + Clone,
H: IntoHttpHandler,
{
fn clone(&self) -> Self {
Self {
factory: self.factory.clone(),
addr: self.addr,
keep_alive: self.keep_alive,
host: self.host.clone(),
_t: PhantomData,
}
}
}
impl<F, H, Io> HttpPipelineFactory for DefaultPipelineFactory<F, H, Io>
where
Io: IoStream + Send,
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
type Io = Io;
type NewService = HttpService<F, H, Io>;
fn create(&self) -> Self::NewService {
HttpService {
addr: self.addr,
keep_alive: self.keep_alive,
host: self.host.clone(),
factory: self.factory.clone(),
_t: PhantomData,
}
}
}

View File

@ -117,6 +117,7 @@ use tokio_tcp::TcpStream;
pub use actix_net::server::{PauseServer, ResumeServer, StopServer};
pub(crate) mod builder;
mod channel;
mod error;
pub(crate) mod h1;
@ -130,6 +131,7 @@ mod http;
pub(crate) mod input;
pub(crate) mod message;
pub(crate) mod output;
pub(crate) mod service;
pub(crate) mod settings;
mod ssl;

133
src/server/service.rs Normal file
View File

@ -0,0 +1,133 @@
use std::marker::PhantomData;
use std::net;
use std::time::Duration;
use actix_net::service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Poll};
use super::channel::HttpChannel;
use super::handler::{HttpHandler, IntoHttpHandler};
use super::settings::{ServerSettings, WorkerSettings};
use super::{IoStream, KeepAlive};
pub enum HttpServiceMessage<T> {
/// New stream
Connect(T),
/// Gracefull shutdown
Shutdown(Duration),
/// Force shutdown
ForceShutdown,
}
pub(crate) struct HttpService<F, H, Io>
where
F: Fn() -> H,
H: IntoHttpHandler,
Io: IoStream,
{
factory: F,
addr: net::SocketAddr,
host: Option<String>,
keep_alive: KeepAlive,
_t: PhantomData<Io>,
}
impl<F, H, Io> HttpService<F, H, Io>
where
F: Fn() -> H,
H: IntoHttpHandler,
Io: IoStream,
{
pub fn new(
factory: F, addr: net::SocketAddr, host: Option<String>, keep_alive: KeepAlive,
) -> Self {
HttpService {
factory,
addr,
host,
keep_alive,
_t: PhantomData,
}
}
}
impl<F, H, Io> NewService for HttpService<F, H, Io>
where
F: Fn() -> H,
H: IntoHttpHandler,
Io: IoStream,
{
type Request = Io;
type Response = ();
type Error = ();
type InitError = ();
type Service = HttpServiceHandler<H::Handler, Io>;
type Future = FutureResult<Self::Service, Self::Error>;
fn new_service(&self) -> Self::Future {
let s = ServerSettings::new(Some(self.addr), &self.host, false);
let app = (self.factory)().into_handler();
ok(HttpServiceHandler::new(app, self.keep_alive, s))
}
}
pub(crate) struct HttpServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
settings: WorkerSettings<H>,
tcp_ka: Option<Duration>,
_t: PhantomData<Io>,
}
impl<H, Io> HttpServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
fn new(
app: H, keep_alive: KeepAlive, settings: ServerSettings,
) -> HttpServiceHandler<H, Io> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(Duration::new(val as u64, 0))
} else {
None
};
let settings = WorkerSettings::new(app, keep_alive, settings);
HttpServiceHandler {
tcp_ka,
settings,
_t: PhantomData,
}
}
}
impl<H, Io> Service for HttpServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
{
type Request = Io;
type Response = ();
type Error = ();
type Future = HttpChannel<Io, H>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, mut req: Self::Request) -> Self::Future {
let _ = req.set_nodelay(true);
HttpChannel::new(self.settings.clone(), req, None)
}
// fn shutdown(&self, force: bool) {
// if force {
// self.settings.head().traverse::<TcpStream, H>();
// }
// }
}

View File

@ -2,7 +2,7 @@ use std::cell::{RefCell, RefMut, UnsafeCell};
use std::collections::VecDeque;
use std::fmt::Write;
use std::rc::Rc;
use std::time::{Instant, Duration};
use std::time::{Duration, Instant};
use std::{env, fmt, net};
use bytes::BytesMut;
@ -12,8 +12,8 @@ use http::StatusCode;
use lazycell::LazyCell;
use parking_lot::Mutex;
use time;
use tokio_timer::{sleep, Delay};
use tokio_current_thread::spawn;
use tokio_timer::{sleep, Delay};
use super::channel::Node;
use super::message::{Request, RequestPool};
@ -183,9 +183,7 @@ impl<H> WorkerSettings<H> {
pub fn keep_alive_timer(&self) -> Option<Delay> {
let ka = self.0.keep_alive;
if ka != 0 {
Some(Delay::new(
Instant::now() + Duration::from_secs(ka),
))
Some(Delay::new(Instant::now() + Duration::from_secs(ka)))
} else {
None
}

View File

@ -10,9 +10,9 @@ extern crate http as modhttp;
extern crate rand;
extern crate tokio;
extern crate tokio_current_thread;
extern crate tokio_current_thread as current_thread;
extern crate tokio_reactor;
extern crate tokio_tcp;
extern crate tokio_current_thread as current_thread;
use std::io::{Read, Write};
use std::sync::Arc;