mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-01 16:55:08 +02:00
update tests, remove unused deps
This commit is contained in:
@ -48,12 +48,17 @@ where
|
||||
|
||||
state: State<S>,
|
||||
payload: Option<PayloadSender>,
|
||||
messages: VecDeque<Request>,
|
||||
messages: VecDeque<Message>,
|
||||
|
||||
ka_expire: Instant,
|
||||
ka_timer: Option<Delay>,
|
||||
}
|
||||
|
||||
enum Message {
|
||||
Item(Request),
|
||||
Error(Response),
|
||||
}
|
||||
|
||||
enum State<S: Service> {
|
||||
None,
|
||||
Response(S::Future),
|
||||
@ -131,32 +136,11 @@ where
|
||||
}
|
||||
|
||||
// if checked is set to true, delay disconnect until all tasks have finished.
|
||||
fn client_disconnected(&mut self, _checked: bool) {
|
||||
fn client_disconnected(&mut self) {
|
||||
self.flags.insert(Flags::READ_DISCONNECTED);
|
||||
if let Some(mut payload) = self.payload.take() {
|
||||
payload.set_error(PayloadError::Incomplete);
|
||||
}
|
||||
|
||||
// if !checked || self.tasks.is_empty() {
|
||||
// self.flags
|
||||
// .insert(Flags::WRITE_DISCONNECTED | Flags::FLUSHED);
|
||||
|
||||
// // notify tasks
|
||||
// for mut task in self.tasks.drain(..) {
|
||||
// task.disconnected();
|
||||
// match task.poll_completed() {
|
||||
// Ok(Async::NotReady) => {
|
||||
// // spawn not completed task, it does not require access to io
|
||||
// // at this point
|
||||
// spawn(HttpHandlerTaskFut::new(task.into_task()));
|
||||
// }
|
||||
// Ok(Async::Ready(_)) => (),
|
||||
// Err(err) => {
|
||||
// error!("Unhandled application error: {}", err);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
/// Flush stream
|
||||
@ -166,7 +150,7 @@ where
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(err) => {
|
||||
debug!("Error sending data: {}", err);
|
||||
self.client_disconnected(false);
|
||||
self.client_disconnected();
|
||||
Err(err.into())
|
||||
}
|
||||
Ok(Async::Ready(_)) => {
|
||||
@ -192,19 +176,28 @@ where
|
||||
let state = match self.state {
|
||||
State::None => loop {
|
||||
break if let Some(msg) = self.messages.pop_front() {
|
||||
let mut task = self.service.call(msg);
|
||||
match task.poll() {
|
||||
Ok(Async::Ready(res)) => {
|
||||
if res.body().is_streaming() {
|
||||
unimplemented!()
|
||||
} else {
|
||||
Some(Ok(State::SendResponse(Some(
|
||||
OutMessage::Response(res),
|
||||
))))
|
||||
match msg {
|
||||
Message::Item(msg) => {
|
||||
let mut task = self.service.call(msg);
|
||||
match task.poll() {
|
||||
Ok(Async::Ready(res)) => {
|
||||
if res.body().is_streaming() {
|
||||
unimplemented!()
|
||||
} else {
|
||||
Some(Ok(State::SendResponse(Some(
|
||||
OutMessage::Response(res),
|
||||
))))
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
Some(Ok(State::Response(task)))
|
||||
}
|
||||
Err(err) => Some(Err(DispatchError::Service(err))),
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => Some(Ok(State::Response(task))),
|
||||
Err(err) => Some(Err(DispatchError::Service(err))),
|
||||
Message::Error(res) => Some(Ok(State::SendResponse(Some(
|
||||
OutMessage::Response(res),
|
||||
)))),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
@ -249,7 +242,8 @@ where
|
||||
}
|
||||
}
|
||||
State::SendResponseWithPayload(ref mut item) => {
|
||||
let (msg, body) = item.take().expect("SendResponse is empty");
|
||||
let (msg, body) =
|
||||
item.take().expect("SendResponseWithPayload is empty");
|
||||
match self.framed.start_send(msg) {
|
||||
Ok(AsyncSink::Ready) => {
|
||||
self.flags.set(
|
||||
@ -271,8 +265,7 @@ where
|
||||
match state {
|
||||
Some(Ok(state)) => self.state = state,
|
||||
Some(Err(err)) => {
|
||||
// error!("Unhandled error1: {}", err);
|
||||
self.client_disconnected(false);
|
||||
self.client_disconnected();
|
||||
return Err(err);
|
||||
}
|
||||
None => {
|
||||
@ -310,12 +303,12 @@ where
|
||||
Ok(Async::NotReady) => self.state = State::Response(task),
|
||||
Err(err) => {
|
||||
error!("Unhandled application error: {}", err);
|
||||
self.client_disconnected(false);
|
||||
self.client_disconnected();
|
||||
return Err(DispatchError::Service(err));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.messages.push_back(msg);
|
||||
self.messages.push_back(Message::Item(msg));
|
||||
}
|
||||
}
|
||||
InMessage::MessageWithPayload(msg) => {
|
||||
@ -324,7 +317,7 @@ where
|
||||
*msg.inner.payload.borrow_mut() = Some(pl);
|
||||
self.payload = Some(ps);
|
||||
|
||||
self.messages.push_back(msg);
|
||||
self.messages.push_back(Message::Item(msg));
|
||||
}
|
||||
InMessage::Chunk(chunk) => {
|
||||
if let Some(ref mut payload) = self.payload {
|
||||
@ -332,7 +325,9 @@ where
|
||||
} else {
|
||||
error!("Internal server error: unexpected payload chunk");
|
||||
self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED);
|
||||
// self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
self.messages.push_back(Message::Error(
|
||||
Response::InternalServerError().finish(),
|
||||
));
|
||||
self.error = Some(DispatchError::InternalError);
|
||||
}
|
||||
}
|
||||
@ -342,7 +337,9 @@ where
|
||||
} else {
|
||||
error!("Internal server error: unexpected eof");
|
||||
self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED);
|
||||
// self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
self.messages.push_back(Message::Error(
|
||||
Response::InternalServerError().finish(),
|
||||
));
|
||||
self.error = Some(DispatchError::InternalError);
|
||||
}
|
||||
}
|
||||
@ -363,7 +360,7 @@ where
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
if self.flags.contains(Flags::READ_DISCONNECTED) {
|
||||
self.client_disconnected(true);
|
||||
self.client_disconnected();
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -378,7 +375,8 @@ where
|
||||
}
|
||||
|
||||
// Malformed requests should be responded with 400
|
||||
// self.push_response_entry(StatusCode::BAD_REQUEST);
|
||||
self.messages
|
||||
.push_back(Message::Error(Response::BadRequest().finish()));
|
||||
self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED);
|
||||
self.error = Some(DispatchError::MalformedRequest);
|
||||
break;
|
||||
|
Reference in New Issue
Block a user