mirror of
https://github.com/fafhrd91/actix-web
synced 2025-01-18 05:41:50 +01:00
refactor response generation
This commit is contained in:
parent
78e6149d9f
commit
0e6a67fc26
@ -50,7 +50,7 @@ impl Route for MyRoute {
|
||||
|
||||
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self>
|
||||
{
|
||||
Reply::with(req, httpcodes::HTTPOk)
|
||||
Reply::reply(req, httpcodes::HTTPOk)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,7 @@ use actix::fut::ActorFuture;
|
||||
use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, SpawnHandle};
|
||||
|
||||
use route::{Route, Frame};
|
||||
use httpmessage::HttpResponse;
|
||||
use httpmessage::{HttpRequest, HttpResponse};
|
||||
|
||||
|
||||
/// Actor execution context
|
||||
@ -94,8 +94,8 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
|
||||
}
|
||||
|
||||
/// Start response processing
|
||||
pub fn start(&mut self, response: HttpResponse) {
|
||||
self.stream.push_back(Frame::Message(response))
|
||||
pub fn start<R: Into<HttpResponse>>(&mut self, request: HttpRequest, response: R) {
|
||||
self.stream.push_back(Frame::Message(request, response.into()))
|
||||
}
|
||||
|
||||
/// Write payload
|
||||
|
@ -10,7 +10,7 @@
|
||||
pub use ws;
|
||||
pub use httpcodes;
|
||||
pub use application::Application;
|
||||
pub use httpmessage::{Body, HttpRequest, HttpResponse, IntoHttpResponse};
|
||||
pub use httpmessage::{Body, Builder, HttpRequest, HttpResponse};
|
||||
pub use payload::{Payload, PayloadItem};
|
||||
pub use router::RoutingMap;
|
||||
pub use resource::{Reply, Resource};
|
||||
|
@ -6,7 +6,7 @@ use http::StatusCode;
|
||||
use task::Task;
|
||||
use route::RouteHandler;
|
||||
use payload::Payload;
|
||||
use httpmessage::{Body, HttpRequest, HttpResponse, IntoHttpResponse};
|
||||
use httpmessage::{Body, Builder, HttpRequest, HttpResponse};
|
||||
|
||||
pub const HTTPOk: StaticResponse = StaticResponse(StatusCode::OK);
|
||||
pub const HTTPCreated: StaticResponse = StaticResponse(StatusCode::CREATED);
|
||||
@ -14,25 +14,34 @@ pub const HTTPNoContent: StaticResponse = StaticResponse(StatusCode::NO_CONTENT)
|
||||
pub const HTTPBadRequest: StaticResponse = StaticResponse(StatusCode::BAD_REQUEST);
|
||||
pub const HTTPNotFound: StaticResponse = StaticResponse(StatusCode::NOT_FOUND);
|
||||
pub const HTTPMethodNotAllowed: StaticResponse = StaticResponse(StatusCode::METHOD_NOT_ALLOWED);
|
||||
pub const HTTPInternalServerError: StaticResponse =
|
||||
StaticResponse(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
|
||||
|
||||
pub struct StaticResponse(StatusCode);
|
||||
|
||||
impl StaticResponse {
|
||||
pub fn with_reason(self, req: HttpRequest, reason: &'static str) -> HttpResponse {
|
||||
HttpResponse::new(req, self.0, Body::Empty)
|
||||
.set_reason(reason)
|
||||
pub fn builder(&self) -> Builder {
|
||||
HttpResponse::builder(self.0)
|
||||
}
|
||||
pub fn response(&self) -> HttpResponse {
|
||||
HttpResponse::new(self.0, Body::Empty)
|
||||
}
|
||||
pub fn with_reason(self, reason: &'static str) -> HttpResponse {
|
||||
let mut resp = HttpResponse::new(self.0, Body::Empty);
|
||||
resp.set_reason(reason);
|
||||
resp
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> RouteHandler<S> for StaticResponse {
|
||||
fn handle(&self, req: HttpRequest, _: Payload, _: Rc<S>) -> Task {
|
||||
Task::reply(HttpResponse::new(req, self.0, Body::Empty))
|
||||
Task::reply(req, HttpResponse::new(self.0, Body::Empty))
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoHttpResponse for StaticResponse {
|
||||
fn response(self, req: HttpRequest) -> HttpResponse {
|
||||
HttpResponse::new(req, self.0, Body::Empty)
|
||||
impl From<StaticResponse> for HttpResponse {
|
||||
fn from(st: StaticResponse) -> Self {
|
||||
st.response()
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +1,13 @@
|
||||
//! Pieces pertaining to the HTTP message protocol.
|
||||
use std::{io, mem};
|
||||
use std::error::Error as StdError;
|
||||
use std::convert::Into;
|
||||
|
||||
use bytes::Bytes;
|
||||
use http::{Method, StatusCode, Version, Uri, HeaderMap};
|
||||
use http::{Method, StatusCode, Version, Uri, HeaderMap, HttpTryFrom, Error};
|
||||
use http::header::{self, HeaderName, HeaderValue};
|
||||
|
||||
use Params;
|
||||
use error::Error;
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Debug)]
|
||||
pub enum ConnectionType {
|
||||
@ -22,23 +22,6 @@ pub trait Message {
|
||||
|
||||
fn headers(&self) -> &HeaderMap;
|
||||
|
||||
/// Checks if a connection should be kept alive.
|
||||
fn keep_alive(&self) -> bool {
|
||||
if let Some(conn) = self.headers().get(header::CONNECTION) {
|
||||
if let Ok(conn) = conn.to_str() {
|
||||
if self.version() == Version::HTTP_10 && !conn.contains("keep-alive") {
|
||||
false
|
||||
} else {
|
||||
self.version() == Version::HTTP_11 && conn.contains("close")
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
self.version() != Version::HTTP_10
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if a connection is expecting a `100 Continue` before sending its body.
|
||||
#[inline]
|
||||
fn expecting_continue(&self) -> bool {
|
||||
@ -52,13 +35,14 @@ pub trait Message {
|
||||
false
|
||||
}
|
||||
|
||||
fn is_chunked(&self) -> Result<bool, Error> {
|
||||
fn is_chunked(&self) -> Result<bool, io::Error> {
|
||||
if let Some(encodings) = self.headers().get(header::TRANSFER_ENCODING) {
|
||||
if let Ok(s) = encodings.to_str() {
|
||||
return Ok(s.to_lowercase().contains("chunked"))
|
||||
} else {
|
||||
debug!("request with transfer-encoding header, but not chunked, bad request");
|
||||
Err(Error::Header)
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Request with transfer-encoding header, but not chunked"))
|
||||
}
|
||||
} else {
|
||||
Ok(false)
|
||||
@ -160,6 +144,23 @@ impl HttpRequest {
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if a connection should be kept alive.
|
||||
pub fn keep_alive(&self) -> bool {
|
||||
if let Some(conn) = self.headers.get(header::CONNECTION) {
|
||||
if let Ok(conn) = conn.to_str() {
|
||||
if self.version == Version::HTTP_10 && !conn.contains("keep-alive") {
|
||||
false
|
||||
} else {
|
||||
self.version == Version::HTTP_11 && conn.contains("close")
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
self.version != Version::HTTP_10
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_upgrade(&self) -> bool {
|
||||
if let Some(conn) = self.headers().get(header::CONNECTION) {
|
||||
if let Ok(s) = conn.to_str() {
|
||||
@ -196,23 +197,15 @@ impl Body {
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements by something that can be converted to `HttpResponse`
|
||||
pub trait IntoHttpResponse {
|
||||
/// Convert into response.
|
||||
fn response(self, req: HttpRequest) -> HttpResponse;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// An HTTP Response
|
||||
pub struct HttpResponse {
|
||||
request: HttpRequest,
|
||||
pub version: Version,
|
||||
pub headers: HeaderMap,
|
||||
pub status: StatusCode,
|
||||
reason: Option<&'static str>,
|
||||
body: Body,
|
||||
chunked: bool,
|
||||
// compression: Option<Encoding>,
|
||||
connection_type: Option<ConnectionType>,
|
||||
}
|
||||
|
||||
@ -226,13 +219,20 @@ impl Message for HttpResponse {
|
||||
}
|
||||
|
||||
impl HttpResponse {
|
||||
|
||||
#[inline]
|
||||
pub fn builder(status: StatusCode) -> Builder {
|
||||
Builder {
|
||||
parts: Some(Parts::new(status)),
|
||||
err: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs a response
|
||||
#[inline]
|
||||
pub fn new(request: HttpRequest, status: StatusCode, body: Body) -> HttpResponse {
|
||||
let version = request.version;
|
||||
pub fn new(status: StatusCode, body: Body) -> HttpResponse {
|
||||
HttpResponse {
|
||||
request: request,
|
||||
version: version,
|
||||
version: Version::HTTP_11,
|
||||
headers: Default::default(),
|
||||
status: status,
|
||||
reason: None,
|
||||
@ -243,12 +243,6 @@ impl HttpResponse {
|
||||
}
|
||||
}
|
||||
|
||||
/// Original prequest
|
||||
#[inline]
|
||||
pub fn request(&self) -> &HttpRequest {
|
||||
&self.request
|
||||
}
|
||||
|
||||
/// Get the HTTP version of this response.
|
||||
#[inline]
|
||||
pub fn version(&self) -> Version {
|
||||
@ -275,34 +269,19 @@ impl HttpResponse {
|
||||
|
||||
/// Set the `StatusCode` for this response.
|
||||
#[inline]
|
||||
pub fn set_status(mut self, status: StatusCode) -> Self {
|
||||
self.status = status;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a header and move the Response.
|
||||
#[inline]
|
||||
pub fn set_header(mut self, name: HeaderName, value: HeaderValue) -> Self {
|
||||
self.headers.insert(name, value);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the headers.
|
||||
#[inline]
|
||||
pub fn with_headers(mut self, headers: HeaderMap) -> Self {
|
||||
self.headers = headers;
|
||||
self
|
||||
pub fn status_mut(&mut self) -> &mut StatusCode {
|
||||
&mut self.status
|
||||
}
|
||||
|
||||
/// Set the custom reason for the response.
|
||||
#[inline]
|
||||
pub fn set_reason(mut self, reason: &'static str) -> Self {
|
||||
pub fn set_reason(&mut self, reason: &'static str) -> &mut Self {
|
||||
self.reason = Some(reason);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set connection type
|
||||
pub fn set_connection_type(mut self, conn: ConnectionType) -> Self {
|
||||
pub fn set_connection_type(&mut self, conn: ConnectionType) -> &mut Self{
|
||||
self.connection_type = Some(conn);
|
||||
self
|
||||
}
|
||||
@ -313,11 +292,11 @@ impl HttpResponse {
|
||||
}
|
||||
|
||||
/// Keep-alive status for this connection
|
||||
pub fn keep_alive(&self) -> bool {
|
||||
pub fn keep_alive(&self) -> Option<bool> {
|
||||
if let Some(ConnectionType::KeepAlive) = self.connection_type {
|
||||
true
|
||||
Some(true)
|
||||
} else {
|
||||
self.request.keep_alive()
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@ -348,9 +327,8 @@ impl HttpResponse {
|
||||
}
|
||||
|
||||
/// Set a body
|
||||
pub fn set_body<B: Into<Body>>(mut self, body: B) -> Self {
|
||||
pub fn set_body<B: Into<Body>>(&mut self, body: B) {
|
||||
self.body = body.into();
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a body and return previous body value
|
||||
@ -358,3 +336,134 @@ impl HttpResponse {
|
||||
mem::replace(&mut self.body, body.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for HttpResponse {
|
||||
fn from(err: Error) -> Self {
|
||||
HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Body::Binary(err.description().into()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Parts {
|
||||
version: Version,
|
||||
headers: HeaderMap,
|
||||
status: StatusCode,
|
||||
reason: Option<&'static str>,
|
||||
chunked: bool,
|
||||
connection_type: Option<ConnectionType>,
|
||||
}
|
||||
|
||||
impl Parts {
|
||||
fn new(status: StatusCode) -> Self {
|
||||
Parts {
|
||||
version: Version::default(),
|
||||
headers: HeaderMap::new(),
|
||||
status: status,
|
||||
reason: None,
|
||||
chunked: false,
|
||||
connection_type: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// An HTTP response builder
|
||||
///
|
||||
/// This type can be used to construct an instance of `HttpResponse` through a
|
||||
/// builder-like pattern.
|
||||
#[derive(Debug)]
|
||||
pub struct Builder {
|
||||
parts: Option<Parts>,
|
||||
err: Option<Error>,
|
||||
}
|
||||
|
||||
impl Builder {
|
||||
/// Get the HTTP version of this response.
|
||||
#[inline]
|
||||
pub fn version(&mut self, version: Version) -> &mut Self {
|
||||
if let Some(parts) = parts(&mut self.parts, &self.err) {
|
||||
parts.version = version;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the `StatusCode` for this response.
|
||||
#[inline]
|
||||
pub fn status(&mut self, status: StatusCode) -> &mut Self {
|
||||
if let Some(parts) = parts(&mut self.parts, &self.err) {
|
||||
parts.status = status;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a header.
|
||||
#[inline]
|
||||
pub fn header<K, V>(&mut self, key: K, value: V) -> &mut Self
|
||||
where HeaderName: HttpTryFrom<K>,
|
||||
HeaderValue: HttpTryFrom<V>
|
||||
{
|
||||
if let Some(parts) = parts(&mut self.parts, &self.err) {
|
||||
match HeaderName::try_from(key) {
|
||||
Ok(key) => {
|
||||
match HeaderValue::try_from(value) {
|
||||
Ok(value) => { parts.headers.append(key, value); }
|
||||
Err(e) => self.err = Some(e.into()),
|
||||
}
|
||||
},
|
||||
Err(e) => self.err = Some(e.into()),
|
||||
};
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the custom reason for the response.
|
||||
#[inline]
|
||||
pub fn reason(&mut self, reason: &'static str) -> &mut Self {
|
||||
if let Some(parts) = parts(&mut self.parts, &self.err) {
|
||||
parts.reason = Some(reason);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Set connection type
|
||||
pub fn connection_type(mut self, conn: ConnectionType) -> Self {
|
||||
if let Some(parts) = parts(&mut self.parts, &self.err) {
|
||||
parts.connection_type = Some(conn);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables automatic chunked transfer encoding
|
||||
pub fn enable_chunked(&mut self) -> &mut Self {
|
||||
if let Some(parts) = parts(&mut self.parts, &self.err) {
|
||||
parts.chunked = true;
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a body
|
||||
pub fn body<B: Into<Body>>(&mut self, body: B) -> Result<HttpResponse, Error> {
|
||||
let parts = self.parts.take().expect("cannot reuse response builder");
|
||||
if let Some(e) = self.err.take() {
|
||||
return Err(e)
|
||||
}
|
||||
Ok(HttpResponse {
|
||||
version: parts.version,
|
||||
headers: parts.headers,
|
||||
status: parts.status,
|
||||
reason: parts.reason,
|
||||
body: body.into(),
|
||||
chunked: parts.chunked,
|
||||
connection_type: parts.connection_type,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn parts<'a>(parts: &'a mut Option<Parts>, err: &Option<Error>) -> Option<&'a mut Parts>
|
||||
{
|
||||
if err.is_some() {
|
||||
return None
|
||||
}
|
||||
parts.as_mut()
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ pub mod ws;
|
||||
pub mod dev;
|
||||
pub mod httpcodes;
|
||||
pub use application::Application;
|
||||
pub use httpmessage::{Body, HttpRequest, HttpResponse, IntoHttpResponse};
|
||||
pub use httpmessage::{Body, Builder, HttpRequest, HttpResponse};
|
||||
pub use payload::{Payload, PayloadItem};
|
||||
pub use router::RoutingMap;
|
||||
pub use resource::{Reply, Resource};
|
||||
|
10
src/main.rs
10
src/main.rs
@ -24,7 +24,7 @@ impl Route for MyRoute {
|
||||
ctx.add_stream(payload);
|
||||
Reply::stream(MyRoute{req: Some(req)})
|
||||
} else {
|
||||
Reply::with(req, httpcodes::HTTPOk)
|
||||
Reply::reply(req, httpcodes::HTTPOk)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -42,7 +42,7 @@ impl Handler<PayloadItem> for MyRoute {
|
||||
{
|
||||
println!("CHUNK: {:?}", msg);
|
||||
if let Some(req) = self.req.take() {
|
||||
ctx.start(httpcodes::HTTPOk.response(req));
|
||||
ctx.start(req, httpcodes::HTTPOk);
|
||||
ctx.write_eof();
|
||||
}
|
||||
Self::empty()
|
||||
@ -59,14 +59,14 @@ impl Route for MyWS {
|
||||
type State = ();
|
||||
|
||||
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self> {
|
||||
match ws::handshake(req) {
|
||||
match ws::handshake(&req) {
|
||||
Ok(resp) => {
|
||||
ctx.start(resp);
|
||||
ctx.start(req, resp);
|
||||
ctx.add_stream(ws::WsStream::new(payload));
|
||||
Reply::stream(MyWS{})
|
||||
},
|
||||
Err(err) =>
|
||||
Reply::reply(err)
|
||||
Reply::reply(req, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ use route::{Route, RouteHandler};
|
||||
use payload::Payload;
|
||||
use context::HttpContext;
|
||||
use httpcodes::HTTPMethodNotAllowed;
|
||||
use httpmessage::{HttpRequest, HttpResponse, IntoHttpResponse};
|
||||
use httpmessage::{HttpRequest, HttpResponse};
|
||||
|
||||
/// Http resource
|
||||
///
|
||||
@ -109,7 +109,7 @@ impl<S: 'static> RouteHandler<S> for Resource<S> {
|
||||
|
||||
#[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))]
|
||||
enum ReplyItem<A> where A: Actor + Route {
|
||||
Message(HttpResponse),
|
||||
Message(HttpRequest, HttpResponse),
|
||||
Actor(A),
|
||||
}
|
||||
|
||||
@ -124,20 +124,15 @@ impl<A> Reply<A> where A: Actor + Route
|
||||
}
|
||||
|
||||
/// Send response
|
||||
pub fn reply(msg: HttpResponse) -> Self {
|
||||
Reply(ReplyItem::Message(msg))
|
||||
}
|
||||
|
||||
/// Send response
|
||||
pub fn with<I: IntoHttpResponse>(req: HttpRequest, msg: I) -> Self {
|
||||
Reply(ReplyItem::Message(msg.response(req)))
|
||||
pub fn reply<R: Into<HttpResponse>>(req: HttpRequest, response: R) -> Self {
|
||||
Reply(ReplyItem::Message(req, response.into()))
|
||||
}
|
||||
|
||||
pub fn into(self, mut ctx: HttpContext<A>) -> Task where A: Actor<Context=HttpContext<A>>
|
||||
{
|
||||
match self.0 {
|
||||
ReplyItem::Message(msg) => {
|
||||
Task::reply(msg)
|
||||
ReplyItem::Message(req, msg) => {
|
||||
Task::reply(req, msg)
|
||||
},
|
||||
ReplyItem::Actor(act) => {
|
||||
ctx.set_actor(act);
|
||||
|
@ -14,7 +14,7 @@ use httpmessage::{HttpRequest, HttpResponse};
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))]
|
||||
pub enum Frame {
|
||||
Message(HttpResponse),
|
||||
Message(HttpRequest, HttpResponse),
|
||||
Payload(Option<Bytes>),
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,7 @@ use route::RouteHandler;
|
||||
use resource::Resource;
|
||||
use application::Application;
|
||||
use httpcodes::HTTPNotFound;
|
||||
use httpmessage::{HttpRequest, IntoHttpResponse};
|
||||
use httpmessage::HttpRequest;
|
||||
|
||||
pub(crate) trait Handler: 'static {
|
||||
fn handle(&self, req: HttpRequest, payload: Payload) -> Task;
|
||||
@ -138,7 +138,7 @@ impl Router {
|
||||
return app.handle(req, payload)
|
||||
}
|
||||
}
|
||||
Task::reply(IntoHttpResponse::response(HTTPNotFound, req))
|
||||
Task::reply(req, HTTPNotFound.response())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
21
src/task.rs
21
src/task.rs
@ -12,7 +12,7 @@ use tokio_core::net::TcpStream;
|
||||
|
||||
use date;
|
||||
use route::Frame;
|
||||
use httpmessage::{Body, HttpResponse};
|
||||
use httpmessage::{Body, HttpRequest, HttpResponse};
|
||||
|
||||
type FrameStream = Stream<Item=Frame, Error=io::Error>;
|
||||
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
|
||||
@ -56,9 +56,9 @@ pub struct Task {
|
||||
|
||||
impl Task {
|
||||
|
||||
pub(crate) fn reply(msg: HttpResponse) -> Self {
|
||||
pub fn reply(req: HttpRequest, msg: HttpResponse) -> Self {
|
||||
let mut frames = VecDeque::new();
|
||||
frames.push_back(Frame::Message(msg));
|
||||
frames.push_back(Frame::Message(req, msg));
|
||||
frames.push_back(Frame::Payload(None));
|
||||
|
||||
Task {
|
||||
@ -86,7 +86,7 @@ impl Task {
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare(&mut self, mut msg: HttpResponse)
|
||||
fn prepare(&mut self, req: HttpRequest, mut msg: HttpResponse)
|
||||
{
|
||||
trace!("Prepare message status={:?}", msg.status);
|
||||
|
||||
@ -143,7 +143,7 @@ impl Task {
|
||||
msg.headers.insert(CONNECTION, HeaderValue::from_static("upgrade"));
|
||||
}
|
||||
// keep-alive
|
||||
else if msg.keep_alive() {
|
||||
else if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) {
|
||||
if msg.version < Version::HTTP_11 {
|
||||
msg.headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
|
||||
}
|
||||
@ -184,7 +184,7 @@ impl Task {
|
||||
|
||||
self.buffer.extend(b"\r\n");
|
||||
|
||||
if let Body::Binary(ref bytes) = *msg.body() {
|
||||
if let Body::Binary(ref bytes) = body {
|
||||
self.buffer.extend(bytes);
|
||||
return
|
||||
}
|
||||
@ -192,7 +192,7 @@ impl Task {
|
||||
}
|
||||
|
||||
pub(crate) fn poll_io(&mut self, io: &mut TcpStream) -> Poll<bool, ()> {
|
||||
println!("POLL-IO {:?}", self.frames.len());
|
||||
trace!("POLL-IO frames:{:?}", self.frames.len());
|
||||
// response is completed
|
||||
if self.frames.is_empty() && self.iostate.is_done() {
|
||||
return Ok(Async::Ready(self.state.is_done()));
|
||||
@ -210,9 +210,10 @@ impl Task {
|
||||
|
||||
// use exiting frames
|
||||
while let Some(frame) = self.frames.pop_front() {
|
||||
trace!("IO Frame: {:?}", frame);
|
||||
match frame {
|
||||
Frame::Message(message) => {
|
||||
self.prepare(message);
|
||||
Frame::Message(request, response) => {
|
||||
self.prepare(request, response);
|
||||
}
|
||||
Frame::Payload(chunk) => {
|
||||
match chunk {
|
||||
@ -275,7 +276,7 @@ impl Future for Task {
|
||||
match stream.poll() {
|
||||
Ok(Async::Ready(Some(frame))) => {
|
||||
match frame {
|
||||
Frame::Message(ref msg) => {
|
||||
Frame::Message(_, ref msg) => {
|
||||
if self.iostate != TaskIOState::ReadingMessage {
|
||||
error!("Non expected frame {:?}", frame);
|
||||
return Err(())
|
||||
|
38
src/ws.rs
38
src/ws.rs
@ -24,17 +24,17 @@
|
||||
//! fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self>
|
||||
//! {
|
||||
//! // WebSocket handshake
|
||||
//! match ws::handshake(req) {
|
||||
//! match ws::handshake(&req) {
|
||||
//! Ok(resp) => {
|
||||
//! // Send handshake response to peer
|
||||
//! ctx.start(resp);
|
||||
//! ctx.start(req, resp);
|
||||
//! // Map Payload into WsStream
|
||||
//! ctx.add_stream(ws::WsStream::new(payload));
|
||||
//! // Start ws messages processing
|
||||
//! Reply::stream(WsRoute)
|
||||
//! },
|
||||
//! Err(err) =>
|
||||
//! Reply::reply(err)
|
||||
//! Reply::reply(req, err)
|
||||
//! }
|
||||
//! }
|
||||
//! }
|
||||
@ -64,7 +64,6 @@
|
||||
//! fn main() {}
|
||||
//! ```
|
||||
use std::vec::Vec;
|
||||
use std::str::FromStr;
|
||||
use http::{Method, StatusCode, header};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::{Async, Poll, Stream};
|
||||
@ -75,7 +74,7 @@ use context::HttpContext;
|
||||
use route::Route;
|
||||
use payload::Payload;
|
||||
use httpcodes::{HTTPBadRequest, HTTPMethodNotAllowed};
|
||||
use httpmessage::{Body, ConnectionType, HttpRequest, HttpResponse, IntoHttpResponse};
|
||||
use httpmessage::{Body, ConnectionType, HttpRequest, HttpResponse};
|
||||
|
||||
use wsframe;
|
||||
use wsproto::*;
|
||||
@ -110,10 +109,10 @@ pub enum Message {
|
||||
// /// `protocols` is a sequence of known protocols. On successful handshake,
|
||||
// /// the returned response headers contain the first protocol in this list
|
||||
// /// which the server also knows.
|
||||
pub fn handshake(req: HttpRequest) -> Result<HttpResponse, HttpResponse> {
|
||||
pub fn handshake(req: &HttpRequest) -> Result<HttpResponse, HttpResponse> {
|
||||
// WebSocket accepts only GET
|
||||
if *req.method() != Method::GET {
|
||||
return Err(HTTPMethodNotAllowed.response(req))
|
||||
return Err(HTTPMethodNotAllowed.response())
|
||||
}
|
||||
|
||||
// Check for "UPGRADE" to websocket header
|
||||
@ -127,17 +126,17 @@ pub fn handshake(req: HttpRequest) -> Result<HttpResponse, HttpResponse> {
|
||||
false
|
||||
};
|
||||
if !has_hdr {
|
||||
return Err(HTTPMethodNotAllowed.with_reason(req, "No WebSocket UPGRADE header found"))
|
||||
return Err(HTTPMethodNotAllowed.with_reason("No WebSocket UPGRADE header found"))
|
||||
}
|
||||
|
||||
// Upgrade connection
|
||||
if !req.is_upgrade() {
|
||||
return Err(HTTPBadRequest.with_reason(req, "No CONNECTION upgrade"))
|
||||
return Err(HTTPBadRequest.with_reason("No CONNECTION upgrade"))
|
||||
}
|
||||
|
||||
// check supported version
|
||||
if !req.headers().contains_key(SEC_WEBSOCKET_VERSION) {
|
||||
return Err(HTTPBadRequest.with_reason(req, "No websocket version header is required"))
|
||||
return Err(HTTPBadRequest.with_reason("No websocket version header is required"))
|
||||
}
|
||||
let supported_ver = {
|
||||
if let Some(hdr) = req.headers().get(SEC_WEBSOCKET_VERSION) {
|
||||
@ -147,25 +146,24 @@ pub fn handshake(req: HttpRequest) -> Result<HttpResponse, HttpResponse> {
|
||||
}
|
||||
};
|
||||
if !supported_ver {
|
||||
return Err(HTTPBadRequest.with_reason(req, "Unsupported version"))
|
||||
return Err(HTTPBadRequest.with_reason("Unsupported version"))
|
||||
}
|
||||
|
||||
// check client handshake for validity
|
||||
if !req.headers().contains_key(SEC_WEBSOCKET_KEY) {
|
||||
return Err(HTTPBadRequest.with_reason(req, "Handshake error"));
|
||||
return Err(HTTPBadRequest.with_reason("Handshake error"));
|
||||
}
|
||||
let key = {
|
||||
let key = req.headers().get(SEC_WEBSOCKET_KEY).unwrap();
|
||||
hash_key(key.as_ref())
|
||||
};
|
||||
|
||||
Ok(HttpResponse::new(req, StatusCode::SWITCHING_PROTOCOLS, Body::Empty)
|
||||
.set_connection_type(ConnectionType::Upgrade)
|
||||
.set_header(header::UPGRADE, header::HeaderValue::from_static("websocket"))
|
||||
.set_header(header::TRANSFER_ENCODING, header::HeaderValue::from_static("chunked"))
|
||||
.set_header(header::HeaderName::from_str(SEC_WEBSOCKET_ACCEPT).unwrap(),
|
||||
header::HeaderValue::from_str(key.as_str()).unwrap())
|
||||
.set_body(Body::Upgrade)
|
||||
Ok(HttpResponse::builder(StatusCode::SWITCHING_PROTOCOLS)
|
||||
.connection_type(ConnectionType::Upgrade)
|
||||
.header(header::UPGRADE, "websocket")
|
||||
.header(header::TRANSFER_ENCODING, "chunked")
|
||||
.header(SEC_WEBSOCKET_ACCEPT, key.as_str())
|
||||
.body(Body::Upgrade)?
|
||||
)
|
||||
}
|
||||
|
||||
@ -204,7 +202,7 @@ impl Stream for WsStream {
|
||||
loop {
|
||||
match wsframe::Frame::parse(&mut self.buf) {
|
||||
Ok(Some(frame)) => {
|
||||
trace!("Frame {}", frame);
|
||||
trace!("WsFrame {}", frame);
|
||||
let (_finished, opcode, payload) = frame.unpack();
|
||||
|
||||
match opcode {
|
||||
|
Loading…
x
Reference in New Issue
Block a user