diff --git a/Cargo.toml b/Cargo.toml index 455b61488..48f199ed8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,6 @@ time = "0.1" encoding = "0.2" lazy_static = "1.0" serde_urlencoded = "^0.5.3" -url = { version="1.7", features=["query_encoding"] } cookie = { version="0.11", features=["percent-encode"] } failure = "^0.1.2" @@ -81,7 +80,6 @@ tokio = "0.1" tokio-io = "0.1" tokio-tcp = "0.1" tokio-timer = "0.2" -tokio-reactor = "0.1" tokio-current-thread = "0.1" # native-tls diff --git a/src/error.rs b/src/error.rs index dc2d45b84..26b3ca56d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -16,7 +16,6 @@ use serde::de::value::Error as DeError; use serde_json::error::Error as JsonError; use serde_urlencoded::ser::Error as FormError; use tokio_timer::Error as TimerError; -pub use url::ParseError as UrlParseError; // re-exports pub use cookie::ParseError as CookieParseError; @@ -196,9 +195,6 @@ impl ResponseError for FormError {} /// `InternalServerError` for `TimerError` impl ResponseError for TimerError {} -/// `InternalServerError` for `UrlParseError` -impl ResponseError for UrlParseError {} - /// Return `BAD_REQUEST` for `de::value::Error` impl ResponseError for DeError { fn error_response(&self) -> Response { @@ -552,29 +548,6 @@ impl From for ReadlinesError { } } -/// Errors which can occur when attempting to generate resource uri. -#[derive(Fail, Debug, PartialEq)] -pub enum UrlGenerationError { - /// Resource not found - #[fail(display = "Resource not found")] - ResourceNotFound, - /// Not all path pattern covered - #[fail(display = "Not all path pattern covered")] - NotEnoughElements, - /// URL parse error - #[fail(display = "{}", _0)] - ParseError(#[cause] UrlParseError), -} - -/// `InternalServerError` for `UrlGeneratorError` -impl ResponseError for UrlGenerationError {} - -impl From for UrlGenerationError { - fn from(err: UrlParseError) -> Self { - UrlGenerationError::ParseError(err) - } -} - /// Helper type that can wrap any error and generate custom response. /// /// In following example any `io::Error` will be converted into "BAD REQUEST" diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index f20013020..a39967a22 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -48,12 +48,17 @@ where state: State, payload: Option, - messages: VecDeque, + messages: VecDeque, ka_expire: Instant, ka_timer: Option, } +enum Message { + Item(Request), + Error(Response), +} + enum State { None, Response(S::Future), @@ -131,32 +136,11 @@ where } // if checked is set to true, delay disconnect until all tasks have finished. - fn client_disconnected(&mut self, _checked: bool) { + fn client_disconnected(&mut self) { self.flags.insert(Flags::READ_DISCONNECTED); if let Some(mut payload) = self.payload.take() { payload.set_error(PayloadError::Incomplete); } - - // if !checked || self.tasks.is_empty() { - // self.flags - // .insert(Flags::WRITE_DISCONNECTED | Flags::FLUSHED); - - // // notify tasks - // for mut task in self.tasks.drain(..) { - // task.disconnected(); - // match task.poll_completed() { - // Ok(Async::NotReady) => { - // // spawn not completed task, it does not require access to io - // // at this point - // spawn(HttpHandlerTaskFut::new(task.into_task())); - // } - // Ok(Async::Ready(_)) => (), - // Err(err) => { - // error!("Unhandled application error: {}", err); - // } - // } - // } - // } } /// Flush stream @@ -166,7 +150,7 @@ where Ok(Async::NotReady) => Ok(Async::NotReady), Err(err) => { debug!("Error sending data: {}", err); - self.client_disconnected(false); + self.client_disconnected(); Err(err.into()) } Ok(Async::Ready(_)) => { @@ -192,19 +176,28 @@ where let state = match self.state { State::None => loop { break if let Some(msg) = self.messages.pop_front() { - let mut task = self.service.call(msg); - match task.poll() { - Ok(Async::Ready(res)) => { - if res.body().is_streaming() { - unimplemented!() - } else { - Some(Ok(State::SendResponse(Some( - OutMessage::Response(res), - )))) + match msg { + Message::Item(msg) => { + let mut task = self.service.call(msg); + match task.poll() { + Ok(Async::Ready(res)) => { + if res.body().is_streaming() { + unimplemented!() + } else { + Some(Ok(State::SendResponse(Some( + OutMessage::Response(res), + )))) + } + } + Ok(Async::NotReady) => { + Some(Ok(State::Response(task))) + } + Err(err) => Some(Err(DispatchError::Service(err))), } } - Ok(Async::NotReady) => Some(Ok(State::Response(task))), - Err(err) => Some(Err(DispatchError::Service(err))), + Message::Error(res) => Some(Ok(State::SendResponse(Some( + OutMessage::Response(res), + )))), } } else { None @@ -249,7 +242,8 @@ where } } State::SendResponseWithPayload(ref mut item) => { - let (msg, body) = item.take().expect("SendResponse is empty"); + let (msg, body) = + item.take().expect("SendResponseWithPayload is empty"); match self.framed.start_send(msg) { Ok(AsyncSink::Ready) => { self.flags.set( @@ -271,8 +265,7 @@ where match state { Some(Ok(state)) => self.state = state, Some(Err(err)) => { - // error!("Unhandled error1: {}", err); - self.client_disconnected(false); + self.client_disconnected(); return Err(err); } None => { @@ -310,12 +303,12 @@ where Ok(Async::NotReady) => self.state = State::Response(task), Err(err) => { error!("Unhandled application error: {}", err); - self.client_disconnected(false); + self.client_disconnected(); return Err(DispatchError::Service(err)); } } } else { - self.messages.push_back(msg); + self.messages.push_back(Message::Item(msg)); } } InMessage::MessageWithPayload(msg) => { @@ -324,7 +317,7 @@ where *msg.inner.payload.borrow_mut() = Some(pl); self.payload = Some(ps); - self.messages.push_back(msg); + self.messages.push_back(Message::Item(msg)); } InMessage::Chunk(chunk) => { if let Some(ref mut payload) = self.payload { @@ -332,7 +325,9 @@ where } else { error!("Internal server error: unexpected payload chunk"); self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); - // self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR); + self.messages.push_back(Message::Error( + Response::InternalServerError().finish(), + )); self.error = Some(DispatchError::InternalError); } } @@ -342,7 +337,9 @@ where } else { error!("Internal server error: unexpected eof"); self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); - // self.push_response_entry(StatusCode::INTERNAL_SERVER_ERROR); + self.messages.push_back(Message::Error( + Response::InternalServerError().finish(), + )); self.error = Some(DispatchError::InternalError); } } @@ -363,7 +360,7 @@ where } Ok(Async::Ready(None)) => { if self.flags.contains(Flags::READ_DISCONNECTED) { - self.client_disconnected(true); + self.client_disconnected(); } break; } @@ -378,7 +375,8 @@ where } // Malformed requests should be responded with 400 - // self.push_response_entry(StatusCode::BAD_REQUEST); + self.messages + .push_back(Message::Error(Response::BadRequest().finish())); self.flags.insert(Flags::READ_DISCONNECTED | Flags::STARTED); self.error = Some(DispatchError::MalformedRequest); break; diff --git a/src/lib.rs b/src/lib.rs index 32f78517b..4ec097266 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -112,12 +112,10 @@ extern crate tokio; extern crate tokio_codec; extern crate tokio_current_thread; extern crate tokio_io; -extern crate tokio_reactor; extern crate tokio_tcp; extern crate tokio_timer; #[cfg(all(unix, feature = "uds"))] extern crate tokio_uds; -extern crate url; #[cfg(test)] #[macro_use] diff --git a/tests/test_h1v2.rs b/tests/test_h1v2.rs deleted file mode 100644 index 1866f29b4..000000000 --- a/tests/test_h1v2.rs +++ /dev/null @@ -1,46 +0,0 @@ -extern crate actix; -extern crate actix_http; -extern crate actix_net; -extern crate actix_web; -extern crate futures; - -use std::thread; - -use actix::System; -use actix_net::server::Server; -use actix_web::{client, test}; -use futures::future; - -use actix_http::{h1, Error, KeepAlive, Response, ServiceConfig}; - -#[test] -fn test_h1_v2() { - let addr = test::TestServer::unused_addr(); - thread::spawn(move || { - Server::new() - .bind("test", addr, move || { - let settings = ServiceConfig::build() - .keep_alive(KeepAlive::Disabled) - .client_timeout(1000) - .client_disconnect(1000) - .server_hostname("localhost") - .server_address(addr) - .finish(); - - h1::H1Service::new(settings, |req| { - println!("REQ: {:?}", req); - future::ok::<_, Error>(Response::Ok().finish()) - }) - }).unwrap() - .run(); - }); - - let mut sys = System::new("test"); - { - let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) - .finish() - .unwrap(); - let response = sys.block_on(req.send()).unwrap(); - assert!(response.status().is_success()); - } -} diff --git a/tests/test_server.rs b/tests/test_server.rs new file mode 100644 index 000000000..cc21416c0 --- /dev/null +++ b/tests/test_server.rs @@ -0,0 +1,90 @@ +extern crate actix; +extern crate actix_http; +extern crate actix_net; +extern crate actix_web; +extern crate futures; + +use std::{io::Read, io::Write, net, thread, time}; + +use actix::System; +use actix_net::server::Server; +use actix_web::{client, test}; +use futures::future; + +use actix_http::{h1, Error, KeepAlive, Response, ServiceConfig}; + +#[test] +fn test_h1_v2() { + let addr = test::TestServer::unused_addr(); + thread::spawn(move || { + Server::new() + .bind("test", addr, move || { + let settings = ServiceConfig::build() + .keep_alive(KeepAlive::Disabled) + .client_timeout(1000) + .client_disconnect(1000) + .server_hostname("localhost") + .server_address(addr) + .finish(); + + h1::H1Service::new(settings, |_| { + future::ok::<_, Error>(Response::Ok().finish()) + }) + }).unwrap() + .run(); + }); + + let mut sys = System::new("test"); + { + let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) + .finish() + .unwrap(); + let response = sys.block_on(req.send()).unwrap(); + assert!(response.status().is_success()); + } +} + +#[test] +fn test_slow_request() { + let addr = test::TestServer::unused_addr(); + thread::spawn(move || { + Server::new() + .bind("test", addr, move || { + let settings = ServiceConfig::build().client_timeout(100).finish(); + + h1::H1Service::new(settings, |_| { + future::ok::<_, Error>(Response::Ok().finish()) + }) + }).unwrap() + .run(); + }); + thread::sleep(time::Duration::from_millis(100)); + + let mut stream = net::TcpStream::connect(addr).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n"); + let mut data = String::new(); + let _ = stream.read_to_string(&mut data); + assert!(data.starts_with("HTTP/1.1 408 Request Timeout")); +} + +#[test] +fn test_malformed_request() { + let addr = test::TestServer::unused_addr(); + thread::spawn(move || { + Server::new() + .bind("test", addr, move || { + let settings = ServiceConfig::build().client_timeout(100).finish(); + h1::H1Service::new(settings, |_| { + future::ok::<_, Error>(Response::Ok().finish()) + }) + }).unwrap() + .run(); + }); + thread::sleep(time::Duration::from_millis(100)); + + let mut stream = net::TcpStream::connect(addr).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP1.1\r\n"); + let mut data = String::new(); + let _ = stream.read_to_string(&mut data); + assert!(data.starts_with("HTTP/1.1 400 Bad Request")); +}