1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-07-31 07:23:57 +02:00

update tests

This commit is contained in:
Nikolay Kim
2019-03-11 16:42:33 -07:00
parent ad43ca735b
commit e15e4f18fd
6 changed files with 383 additions and 96 deletions

View File

@@ -6,10 +6,11 @@ use futures::future::{err, Either};
use futures::{Async, Future, Poll};
use h2::{client::SendRequest, SendStream};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
use http::{request::Request, HttpTryFrom, Version};
use http::{request::Request, HttpTryFrom, Method, Version};
use crate::body::{BodyLength, MessageBody};
use crate::message::{Message, RequestHead, ResponseHead};
use crate::payload::Payload;
use super::connection::{ConnectionType, IoConnection};
use super::error::SendRequestError;
@@ -28,6 +29,7 @@ where
B: MessageBody,
{
trace!("Sending client request: {:?} {:?}", head, body.length());
let head_req = head.method == Method::HEAD;
let length = body.length();
let eof = match length {
BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => true,
@@ -99,18 +101,16 @@ where
}
}
})
.and_then(|resp| {
.and_then(move |resp| {
let (parts, body) = resp.into_parts();
let payload = if head_req { Payload::None } else { body.into() };
let mut head: Message<ResponseHead> = Message::new();
head.version = parts.version;
head.status = parts.status;
head.headers = parts.headers;
Ok(ClientResponse {
head,
payload: body.into(),
})
Ok(ClientResponse { head, payload })
})
.from_err()
}

View File

@@ -228,18 +228,21 @@ where
}
None => None,
},
State::ServiceCall(mut fut) => {
match fut.poll().map_err(|_| DispatchError::Service)? {
Async::Ready(res) => {
let (res, body) = res.into().replace_body(());
Some(self.send_response(res, body)?)
}
Async::NotReady => {
self.state = State::ServiceCall(fut);
None
}
State::ServiceCall(mut fut) => match fut.poll() {
Ok(Async::Ready(res)) => {
let (res, body) = res.into().replace_body(());
Some(self.send_response(res, body)?)
}
}
Ok(Async::NotReady) => {
self.state = State::ServiceCall(fut);
None
}
Err(_e) => {
let res: Response = Response::InternalServerError().finish();
let (res, body) = res.replace_body(());
Some(self.send_response(res, body.into_body())?)
}
},
State::SendPayload(mut stream) => {
loop {
if !self.framed.is_write_buf_full() {
@@ -289,12 +292,17 @@ where
fn handle_request(&mut self, req: Request) -> Result<State<S, B>, DispatchError> {
let mut task = self.service.call(req);
match task.poll().map_err(|_| DispatchError::Service)? {
Async::Ready(res) => {
match task.poll() {
Ok(Async::Ready(res)) => {
let (res, body) = res.into().replace_body(());
self.send_response(res, body)
}
Async::NotReady => Ok(State::ServiceCall(task)),
Ok(Async::NotReady) => Ok(State::ServiceCall(task)),
Err(_e) => {
let res: Response = Response::InternalServerError().finish();
let (res, body) = res.replace_body(());
self.send_response(res, body.into_body())
}
}
}

View File

@@ -107,9 +107,7 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.connection.poll()? {
Async::Ready(None) => {
self.flags.insert(Flags::DISCONNECTED);
}
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some((req, res))) => {
// update keep-alive expire
if self.ka_timer.is_some() {
@@ -255,7 +253,7 @@ where
}
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
Err(_e) => {
let res: Response = Response::InternalServerError().finish();
let (res, body) = res.replace_body(());
@@ -304,7 +302,9 @@ where
}
} else {
match body.poll_next() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Ok(Async::Ready(None)) => {
if let Err(e) = stream.send_data(Bytes::new(), true) {
warn!("{:?}", e);

View File

@@ -40,7 +40,10 @@ impl Stream for Payload {
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
Err(err) => {
println!("======== {:?}", err);
Err(err.into())
}
}
}
}