1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-24 00:21:08 +01:00

Simplify lifetime annotation in HttpServiceBuilder. Simplify PlStream (#2129)

This commit is contained in:
fakeshadow 2021-03-30 07:46:09 -07:00 committed by GitHub
parent f66774e30b
commit c49fe79207
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 32 deletions

View File

@ -63,11 +63,9 @@ where
X: ServiceFactory<Request, Config = (), Response = Request>, X: ServiceFactory<Request, Config = (), Response = Request>,
X::Error: Into<Error>, X::Error: Into<Error>,
X::InitError: fmt::Debug, X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>, U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
U::Error: fmt::Display, U::Error: fmt::Display,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<T, Codec>)>>::Future: 'static,
{ {
/// Set server keep-alive setting. /// Set server keep-alive setting.
/// ///
@ -127,7 +125,6 @@ where
X1: ServiceFactory<Request, Config = (), Response = Request>, X1: ServiceFactory<Request, Config = (), Response = Request>,
X1::Error: Into<Error>, X1::Error: Into<Error>,
X1::InitError: fmt::Debug, X1::InitError: fmt::Debug,
<X1::Service as Service<Request>>::Future: 'static,
{ {
HttpServiceBuilder { HttpServiceBuilder {
keep_alive: self.keep_alive, keep_alive: self.keep_alive,
@ -152,7 +149,6 @@ where
U1: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>, U1: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
U1::Error: fmt::Display, U1::Error: fmt::Display,
U1::InitError: fmt::Debug, U1::InitError: fmt::Debug,
<U1::Service as Service<(Request, Framed<T, Codec>)>>::Future: 'static,
{ {
HttpServiceBuilder { HttpServiceBuilder {
keep_alive: self.keep_alive, keep_alive: self.keep_alive,
@ -211,7 +207,6 @@ where
S::Error: Into<Error> + 'static, S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static, S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
{ {
let cfg = ServiceConfig::new( let cfg = ServiceConfig::new(
self.keep_alive, self.keep_alive,
@ -233,7 +228,6 @@ where
S::Error: Into<Error> + 'static, S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static, S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
{ {
let cfg = ServiceConfig::new( let cfg = ServiceConfig::new(
self.keep_alive, self.keep_alive,

View File

@ -7,7 +7,7 @@ use std::{
use actix_codec::Framed; use actix_codec::Framed;
use bytes::buf::BufMut; use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::Stream; use futures_core::{ready, Stream};
use futures_util::{future::poll_fn, SinkExt as _}; use futures_util::{future::poll_fn, SinkExt as _};
use crate::error::PayloadError; use crate::error::PayloadError;
@ -17,7 +17,7 @@ use crate::http::{
StatusCode, StatusCode,
}; };
use crate::message::{RequestHeadType, ResponseHead}; use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::{Payload, PayloadStream}; use crate::payload::Payload;
use super::connection::{ConnectionIo, H1Connection}; use super::connection::{ConnectionIo, H1Connection};
use super::error::{ConnectError, SendRequestError}; use super::error::{ConnectError, SendRequestError};
@ -122,10 +122,7 @@ where
Ok((head, Payload::None)) Ok((head, Payload::None))
} }
_ => { _ => Ok((head, Payload::Stream(Box::pin(PlStream::new(framed))))),
let pl: PayloadStream = Box::pin(PlStream::new(framed));
Ok((head, pl.into()))
}
} }
} }
@ -194,21 +191,16 @@ where
} }
#[pin_project::pin_project] #[pin_project::pin_project]
pub(crate) struct PlStream<Io: ConnectionIo> pub(crate) struct PlStream<Io: ConnectionIo> {
where
Io: ConnectionIo,
{
#[pin] #[pin]
framed: Option<Framed<H1Connection<Io>, h1::ClientPayloadCodec>>, framed: Framed<H1Connection<Io>, h1::ClientPayloadCodec>,
} }
impl<Io: ConnectionIo> PlStream<Io> { impl<Io: ConnectionIo> PlStream<Io> {
fn new(framed: Framed<H1Connection<Io>, h1::ClientCodec>) -> Self { fn new(framed: Framed<H1Connection<Io>, h1::ClientCodec>) -> Self {
let framed = framed.into_map_codec(|codec| codec.into_payload_codec()); let framed = framed.into_map_codec(|codec| codec.into_payload_codec());
PlStream { PlStream { framed }
framed: Some(framed),
}
} }
} }
@ -219,20 +211,16 @@ impl<Io: ConnectionIo> Stream for PlStream<Io> {
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
let mut framed = self.project().framed.as_pin_mut().unwrap(); let mut this = self.project();
match framed.as_mut().next_item(cx)? { match ready!(this.framed.as_mut().next_item(cx)?) {
Poll::Pending => Poll::Pending, Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))),
Poll::Ready(Some(chunk)) => { Some(None) => {
if let Some(chunk) = chunk { let keep_alive = this.framed.codec_ref().keepalive();
Poll::Ready(Some(Ok(chunk))) this.framed.io_mut().on_release(keep_alive);
} else {
let keep_alive = framed.codec_ref().keepalive();
framed.io_mut().on_release(keep_alive);
Poll::Ready(None) Poll::Ready(None)
} }
} None => Poll::Ready(None),
Poll::Ready(None) => Poll::Ready(None),
} }
} }
} }