1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

added H1SimpleService

This commit is contained in:
Nikolay Kim 2018-10-15 15:56:47 -07:00
parent d39c018c93
commit 3c402a55da
3 changed files with 110 additions and 3 deletions

View File

@ -103,6 +103,17 @@ impl Codec {
self.flags.contains(Flags::KEEPALIVE)
}
/// Check last request's message type
pub fn message_type(&self) -> InMessageType {
if self.flags.contains(Flags::UNHANDLED) {
InMessageType::Unhandled
} else if self.payload.is_none() {
InMessageType::None
} else {
InMessageType::Payload
}
}
/// prepare transfer encoding
pub fn prepare_te(&mut self, res: &mut Response) {
self.te
@ -275,6 +286,7 @@ impl Decoder for Codec {
}
RequestPayloadType::Unhandled => {
self.payload = None;
self.flags.insert(Flags::UNHANDLED);
InMessageType::Unhandled
}
};

View File

@ -10,7 +10,7 @@ mod service;
pub use self::codec::{Codec, InMessage, InMessageType, OutMessage};
pub use self::decoder::{PayloadDecoder, RequestDecoder};
pub use self::dispatcher::Dispatcher;
pub use self::service::{H1Service, H1ServiceHandler};
pub use self::service::{H1Service, H1ServiceHandler, H1SimpleService};
use request::Request;

View File

@ -2,15 +2,18 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::net;
use actix_net::codec::Framed;
use actix_net::service::{IntoNewService, NewService, Service};
use futures::{Async, Future, Poll};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll, Stream};
use tokio_io::{AsyncRead, AsyncWrite};
use config::{KeepAlive, ServiceConfig};
use error::DispatchError;
use error::{DispatchError, ParseError};
use request::Request;
use response::Response;
use super::codec::{Codec, InMessage};
use super::dispatcher::Dispatcher;
use super::H1ServiceResult;
@ -191,6 +194,7 @@ where
}
}
#[doc(hidden)]
pub struct H1ServiceResponse<T, S: NewService> {
fut: S::Future,
cfg: Option<ServiceConfig>,
@ -256,3 +260,94 @@ where
Dispatcher::new(req, self.cfg.clone(), self.srv.clone())
}
}
/// `NewService` implementation for `H1SimpleServiceHandler` service
pub struct H1SimpleService<T> {
config: ServiceConfig,
_t: PhantomData<T>,
}
impl<T> H1SimpleService<T> {
/// Create new `H1SimpleService` instance.
pub fn new() -> Self {
H1SimpleService {
config: ServiceConfig::default(),
_t: PhantomData,
}
}
}
impl<T> NewService for H1SimpleService<T>
where
T: AsyncRead + AsyncWrite,
{
type Request = T;
type Response = (Request, Framed<T, Codec>);
type Error = ParseError;
type InitError = ();
type Service = H1SimpleServiceHandler<T>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
ok(H1SimpleServiceHandler {
config: self.config.clone(),
_t: PhantomData,
})
}
}
/// `Service` implementation for HTTP1 transport. Reads one request and returns
/// request and framed object.
pub struct H1SimpleServiceHandler<T> {
config: ServiceConfig,
_t: PhantomData<T>,
}
impl<T> Service for H1SimpleServiceHandler<T>
where
T: AsyncRead + AsyncWrite,
{
type Request = T;
type Response = (Request, Framed<T, Codec>);
type Error = ParseError;
type Future = H1SimpleServiceHandlerResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
H1SimpleServiceHandlerResponse {
framed: Some(Framed::new(req, Codec::new(self.config.clone()))),
}
}
}
#[doc(hidden)]
pub struct H1SimpleServiceHandlerResponse<T>
where
T: AsyncRead + AsyncWrite,
{
framed: Option<Framed<T, Codec>>,
}
impl<T> Future for H1SimpleServiceHandlerResponse<T>
where
T: AsyncRead + AsyncWrite,
{
type Item = (Request, Framed<T, Codec>);
type Error = ParseError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.framed.as_mut().unwrap().poll()? {
Async::Ready(Some(req)) => match req {
InMessage::Message(req, _) => {
Ok(Async::Ready((req, self.framed.take().unwrap())))
}
InMessage::Chunk(_) => unreachable!("Something is wrong"),
},
Async::Ready(None) => Err(ParseError::Incomplete),
Async::NotReady => Ok(Async::NotReady),
}
}
}