From 0447c66de1c6fa0969c86d52f0e11d22dec42e31 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 13 Oct 2017 14:43:17 -0700 Subject: [PATCH] simplify Frame::Message; impl Try for Reply --- Cargo.toml | 7 +++ src/context.rs | 6 +-- src/error.rs | 110 ++++++++++++++++++++++++++------------------- src/httpcodes.rs | 4 +- src/httpmessage.rs | 23 ++++++---- src/lib.rs | 5 +++ src/main.rs | 21 ++++----- src/reader.rs | 53 ++++++++-------------- src/resource.rs | 40 ++++++++++++++--- src/route.rs | 2 +- src/router.rs | 2 +- src/server.rs | 58 +++++++++++++++++------- src/task.rs | 36 ++++++++++----- 13 files changed, 226 insertions(+), 141 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6d85a815..8b00260c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,17 @@ path = "src/lib.rs" name = "test" path = "src/main.rs" +[features] +default = ["nightly"] + +# Enable nightly features +nightly = [] + [dependencies] time = "0.1" http = "0.1" httparse = "0.1" +cookie = { version="0.10", features=["percent-encode"] } slab = "0.4" sha1 = "0.2" url = "1.5" diff --git a/src/context.rs b/src/context.rs index 2941d71c..7a82bf84 100644 --- a/src/context.rs +++ b/src/context.rs @@ -9,7 +9,7 @@ use actix::fut::ActorFuture; use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, SpawnHandle}; use route::{Route, Frame}; -use httpmessage::{HttpRequest, HttpResponse}; +use httpmessage::HttpResponse; /// Actor execution context @@ -102,8 +102,8 @@ impl HttpContext where A: Actor + Route { } /// Start response processing - pub fn start>(&mut self, request: HttpRequest, response: R) { - self.stream.push_back(Frame::Message(request, response.into())) + pub fn start>(&mut self, response: R) { + self.stream.push_back(Frame::Message(response.into())) } /// Write payload diff --git a/src/error.rs b/src/error.rs index b4525b30..57badbf2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,27 +5,15 @@ use std::io::Error as IoError; use std::str::Utf8Error; use std::string::FromUtf8Error; +use cookie; use httparse; +use http::{StatusCode, Error as HttpError}; -use self::Error::{ - Method, - Uri, - Version, - Header, - Status, - Timeout, - Io, - TooLarge, - Incomplete, - Utf8 -}; - -/// Result type often returned from methods that can have error. -pub type Result = ::std::result::Result; +use httpmessage::{Body, HttpResponse}; /// A set of errors that can occur parsing HTTP streams. #[derive(Debug)] -pub enum Error { +pub enum ParseError { /// An invalid `Method`, such as `GE,T`. Method, /// An invalid `Uri`, such as `exam ple.domain`. @@ -43,79 +31,107 @@ pub enum Error { /// A timeout occurred waiting for an IO event. #[allow(dead_code)] Timeout, + /// Unexpected EOF during parsing + Eof, /// An `io::Error` that occurred while trying to read or write to a network stream. Io(IoError), /// Parsing a field as string failed Utf8(Utf8Error), } -impl fmt::Display for Error { +impl fmt::Display for ParseError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { - Io(ref e) => fmt::Display::fmt(e, f), - Utf8(ref e) => fmt::Display::fmt(e, f), + ParseError::Io(ref e) => fmt::Display::fmt(e, f), + ParseError::Utf8(ref e) => fmt::Display::fmt(e, f), ref e => f.write_str(e.description()), } } } -impl StdError for Error { +impl StdError for ParseError { fn description(&self) -> &str { match *self { - Method => "Invalid Method specified", - Version => "Invalid HTTP version specified", - Header => "Invalid Header provided", - TooLarge => "Message head is too large", - Status => "Invalid Status provided", - Incomplete => "Message is incomplete", - Timeout => "Timeout", - Uri => "Uri error", - Io(ref e) => e.description(), - Utf8(ref e) => e.description(), + ParseError::Method => "Invalid Method specified", + ParseError::Version => "Invalid HTTP version specified", + ParseError::Header => "Invalid Header provided", + ParseError::TooLarge => "Message head is too large", + ParseError::Status => "Invalid Status provided", + ParseError::Incomplete => "Message is incomplete", + ParseError::Timeout => "Timeout", + ParseError::Uri => "Uri error", + ParseError::Eof => "Unexpected eof during parse", + ParseError::Io(ref e) => e.description(), + ParseError::Utf8(ref e) => e.description(), } } fn cause(&self) -> Option<&StdError> { match *self { - Io(ref error) => Some(error), - Utf8(ref error) => Some(error), + ParseError::Io(ref error) => Some(error), + ParseError::Utf8(ref error) => Some(error), _ => None, } } } -impl From for Error { - fn from(err: IoError) -> Error { - Io(err) +impl From for ParseError { + fn from(err: IoError) -> ParseError { + ParseError::Io(err) } } -impl From for Error { - fn from(err: Utf8Error) -> Error { - Utf8(err) +impl From for ParseError { + fn from(err: Utf8Error) -> ParseError { + ParseError::Utf8(err) } } -impl From for Error { - fn from(err: FromUtf8Error) -> Error { - Utf8(err.utf8_error()) +impl From for ParseError { + fn from(err: FromUtf8Error) -> ParseError { + ParseError::Utf8(err.utf8_error()) } } -impl From for Error { - fn from(err: httparse::Error) -> Error { +impl From for ParseError { + fn from(err: httparse::Error) -> ParseError { match err { httparse::Error::HeaderName | httparse::Error::HeaderValue | httparse::Error::NewLine | - httparse::Error::Token => Header, - httparse::Error::Status => Status, - httparse::Error::TooManyHeaders => TooLarge, - httparse::Error::Version => Version, + httparse::Error::Token => ParseError::Header, + httparse::Error::Status => ParseError::Status, + httparse::Error::TooManyHeaders => ParseError::TooLarge, + httparse::Error::Version => ParseError::Version, } } } +/// Return BadRequest for ParseError +impl From for HttpResponse { + fn from(err: ParseError) -> Self { + HttpResponse::new(StatusCode::BAD_REQUEST, + Body::Binary(err.description().into())) + } +} + +/// Return InternalServerError for HttpError, +/// Response generation can return HttpError, so it is internal error +impl From for HttpResponse { + fn from(err: HttpError) -> Self { + HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR, + Body::Binary(err.description().into())) + } +} + +/// Return BadRequest for cookie::ParseError +impl From for HttpResponse { + fn from(err: cookie::ParseError) -> Self { + HttpResponse::new(StatusCode::BAD_REQUEST, + Body::Binary(err.description().into())) + } +} + #[cfg(test)] mod tests { use std::error::Error as StdError; diff --git a/src/httpcodes.rs b/src/httpcodes.rs index 4e5e6558..76470da1 100644 --- a/src/httpcodes.rs +++ b/src/httpcodes.rs @@ -35,8 +35,8 @@ impl StaticResponse { } impl RouteHandler for StaticResponse { - fn handle(&self, req: HttpRequest, _: Payload, _: Rc) -> Task { - Task::reply(req, HttpResponse::new(self.0, Body::Empty)) + fn handle(&self, _: HttpRequest, _: Payload, _: Rc) -> Task { + Task::reply(HttpResponse::new(self.0, Body::Empty)) } } diff --git a/src/httpmessage.rs b/src/httpmessage.rs index 50c1d25f..ad2a09ac 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -1,8 +1,8 @@ //! Pieces pertaining to the HTTP message protocol. -use std::{io, mem}; -use std::error::Error as StdError; +use std::{io, mem, str}; use std::convert::Into; +use cookie; use bytes::Bytes; use http::{Method, StatusCode, Version, Uri, HeaderMap, HttpTryFrom, Error}; use http::header::{self, HeaderName, HeaderValue}; @@ -78,6 +78,17 @@ impl HttpRequest { self.uri.query() } + /// Return request cookie. + pub fn cookie(&self) -> Result, cookie::ParseError> { + if let Some(val) = self.headers.get(header::COOKIE) { + let s = str::from_utf8(val.as_bytes()) + .map_err(|e| cookie::ParseError::from(e))?; + cookie::Cookie::parse(s).map(|c| Some(c)) + } else { + Ok(None) + } + } + /// Get a mutable reference to the Request headers. #[inline] pub fn headers_mut(&mut self) -> &mut HeaderMap { @@ -300,13 +311,7 @@ impl HttpResponse { } } -impl From for HttpResponse { - fn from(err: Error) -> Self { - HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR, - Body::Binary(err.description().into())) - } -} - +/// Helper conversion implementation impl, E: Into> From> for HttpResponse { fn from(res: Result) -> Self { match res { diff --git a/src/lib.rs b/src/lib.rs index 98839bcb..47d12b7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,16 @@ //! Http framework for [Actix](https://github.com/fafhrd91/actix) +#![cfg_attr(feature="nightly", feature( + try_trait, // std::ops::Try #42327 +))] + #[macro_use] extern crate log; extern crate time; extern crate bytes; extern crate sha1; extern crate url; +extern crate cookie; #[macro_use] extern crate futures; extern crate tokio_core; diff --git a/src/main.rs b/src/main.rs index 125e000f..32153b8e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![feature(try_trait)] #![allow(dead_code, unused_variables)] extern crate actix; extern crate actix_http; @@ -24,7 +25,7 @@ impl Route for MyRoute { ctx.add_stream(payload); Reply::stream(MyRoute{req: Some(req)}) } else { - Reply::reply(req, httpcodes::HTTPOk) + Reply::reply(httpcodes::HTTPOk) } } } @@ -42,7 +43,7 @@ impl Handler for MyRoute { { println!("CHUNK: {:?}", msg); if let Some(req) = self.req.take() { - ctx.start(req, httpcodes::HTTPOk); + ctx.start(httpcodes::HTTPOk); ctx.write_eof(); } Self::empty() @@ -58,16 +59,12 @@ impl Actor for MyWS { impl Route for MyWS { type State = (); - fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply { - match ws::handshake(&req) { - Ok(resp) => { - ctx.start(req, resp); - ctx.add_stream(ws::WsStream::new(payload)); - Reply::stream(MyWS{}) - }, - Err(err) => - Reply::reply(req, err) - } + fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply + { + let resp = ws::handshake(&req)?; + ctx.start(resp); + ctx.add_stream(ws::WsStream::new(payload)); + Reply::stream(MyWS{}) } } diff --git a/src/reader.rs b/src/reader.rs index f93a6bb2..ea06b3ba 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,4 +1,4 @@ -use std::{self, fmt, io, ptr}; +use std::{self, io, ptr}; use httparse; use http::{Method, Version, Uri, HttpTryFrom, HeaderMap}; @@ -7,7 +7,7 @@ use bytes::{BytesMut, BufMut}; use futures::{Async, Poll}; use tokio_io::AsyncRead; -use error::{Error, Result}; +use error::ParseError; use decode::Decoder; use httpmessage::HttpRequest; use payload::{Payload, PayloadSender}; @@ -53,7 +53,7 @@ impl Reader { } } - fn decode(&mut self) -> std::result::Result + fn decode(&mut self) -> std::result::Result { if let Some(ref mut payload) = self.payload { if payload.tx.maybe_paused() { @@ -69,7 +69,7 @@ impl Reader { return Ok(Decoding::Ready) }, Ok(Async::NotReady) => return Ok(Decoding::NotReady), - Err(_) => return Err(Error::Incomplete), + Err(_) => return Err(ParseError::Incomplete), } } } else { @@ -77,7 +77,7 @@ impl Reader { } } - pub fn parse(&mut self, io: &mut T) -> Poll<(HttpRequest, Payload), Error> + pub fn parse(&mut self, io: &mut T) -> Poll<(HttpRequest, Payload), ParseError> where T: AsyncRead { loop { @@ -89,8 +89,7 @@ impl Reader { }, Decoding::NotReady => { if 0 == try_ready!(self.read_from_io(io)) { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, ParseEof).into()); + return Err(ParseError::Eof) } } } @@ -119,8 +118,7 @@ impl Reader { match self.read_from_io(io) { Ok(Async::Ready(0)) => { trace!("parse eof"); - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, ParseEof).into()); + return Err(ParseError::Eof); } Ok(Async::Ready(_)) => { continue @@ -141,13 +139,13 @@ impl Reader { None => { if self.read_buf.capacity() >= MAX_BUFFER_SIZE { debug!("MAX_BUFFER_SIZE reached, closing"); - return Err(Error::TooLarge); + return Err(ParseError::TooLarge); } }, } if 0 == try_ready!(self.read_from_io(io)) { trace!("parse eof"); - return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ParseEof).into()); + return Err(ParseError::Eof); } } } @@ -177,23 +175,9 @@ impl Reader { } } -#[derive(Debug)] -struct ParseEof; -impl fmt::Display for ParseEof { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("parse eof") - } -} - -impl ::std::error::Error for ParseEof { - fn description(&self) -> &str { - "parse eof" - } -} - - -pub fn parse(buf: &mut BytesMut) -> Result)>> { +pub fn parse(buf: &mut BytesMut) -> Result)>, ParseError> +{ if buf.is_empty() { return Ok(None); } @@ -211,7 +195,8 @@ pub fn parse(buf: &mut BytesMut) -> Result) match try!(req.parse(buf)) { httparse::Status::Complete(len) => { trace!("Request.parse Complete({})", len); - let method = Method::try_from(req.method.unwrap()).map_err(|_| Error::Method)?; + let method = Method::try_from(req.method.unwrap()) + .map_err(|_| ParseError::Method)?; let path = req.path.unwrap(); let bytes_ptr = buf.as_ref().as_ptr() as usize; let path_start = path.as_ptr() as usize - bytes_ptr; @@ -235,7 +220,7 @@ pub fn parse(buf: &mut BytesMut) -> Result) let slice = buf.split_to(len).freeze(); let path = slice.slice(path.0, path.1); // path was found to be utf8 by httparse - let uri = Uri::from_shared(path).map_err(|_| Error::Uri)?; + let uri = Uri::from_shared(path).map_err(|_| ParseError::Uri)?; // convert headers let mut headers = HeaderMap::with_capacity(headers_len); @@ -246,10 +231,10 @@ pub fn parse(buf: &mut BytesMut) -> Result) { headers.insert(name, value); } else { - return Err(Error::Header) + return Err(ParseError::Header) } } else { - return Err(Error::Header) + return Err(ParseError::Header) } } @@ -263,18 +248,18 @@ pub fn parse(buf: &mut BytesMut) -> Result) // Content-Length else if let Some(len) = msg.headers().get(header::CONTENT_LENGTH) { if chunked { - return Err(Error::Header) + return Err(ParseError::Header) } if let Ok(s) = len.to_str() { if let Ok(len) = s.parse::() { Some(Decoder::length(len)) } else { debug!("illegal Content-Length: {:?}", len); - return Err(Error::Header) + return Err(ParseError::Header) } } else { debug!("illegal Content-Length: {:?}", len); - return Err(Error::Header) + return Err(ParseError::Header) } } else if chunked { Some(Decoder::chunked()) diff --git a/src/resource.rs b/src/resource.rs index c472b321..7ecf7c5e 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -1,4 +1,5 @@ use std::rc::Rc; +use std::convert::From; use std::marker::PhantomData; use std::collections::HashMap; @@ -109,7 +110,7 @@ impl RouteHandler for Resource { #[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))] enum ReplyItem where A: Actor + Route { - Message(HttpRequest, HttpResponse), + Message(HttpResponse), Actor(A), } @@ -124,15 +125,15 @@ impl Reply where A: Actor + Route } /// Send response - pub fn reply>(req: HttpRequest, response: R) -> Self { - Reply(ReplyItem::Message(req, response.into())) + pub fn reply>(response: R) -> Self { + Reply(ReplyItem::Message(response.into())) } pub fn into(self, mut ctx: HttpContext) -> Task where A: Actor> { match self.0 { - ReplyItem::Message(req, msg) => { - Task::reply(req, msg) + ReplyItem::Message(msg) => { + Task::reply(msg) }, ReplyItem::Actor(act) => { ctx.set_actor(act); @@ -141,3 +142,32 @@ impl Reply where A: Actor + Route } } } + +impl From for Reply + where T: Into, A: Actor + Route +{ + fn from(item: T) -> Self { + Reply::reply(item) + } +} + +#[cfg(feature="nightly")] +use std::ops::Try; + +#[cfg(feature="nightly")] +impl Try for Reply where A: Actor + Route { + type Ok = HttpResponse; + type Error = HttpResponse; + + fn into_result(self) -> Result { + panic!("Reply -> Result conversion is not supported") + } + + fn from_error(v: Self::Error) -> Self { + Reply::reply(v) + } + + fn from_ok(v: Self::Ok) -> Self { + Reply::reply(v) + } +} diff --git a/src/route.rs b/src/route.rs index d7c4e015..8e38c554 100644 --- a/src/route.rs +++ b/src/route.rs @@ -14,7 +14,7 @@ use httpmessage::{HttpRequest, HttpResponse}; #[derive(Debug)] #[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))] pub enum Frame { - Message(HttpRequest, HttpResponse), + Message(HttpResponse), Payload(Option), } diff --git a/src/router.rs b/src/router.rs index 731f7370..e86e962f 100644 --- a/src/router.rs +++ b/src/router.rs @@ -138,7 +138,7 @@ impl Router { return app.handle(req, payload) } } - Task::reply(req, HTTPNotFound.response()) + Task::reply(HTTPNotFound.response()) } } } diff --git a/src/server.rs b/src/server.rs index 8e1e3c78..b2243488 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,4 @@ -use std::{io, net}; +use std::{io, net, mem}; use std::rc::Rc; use std::collections::VecDeque; @@ -6,7 +6,7 @@ use actix::dev::*; use futures::{Future, Poll, Async}; use tokio_core::net::{TcpListener, TcpStream}; -use task::Task; +use task::{Task, RequestInfo}; use reader::Reader; use router::{Router, RoutingMap}; @@ -55,6 +55,7 @@ impl Handler<(TcpStream, net::SocketAddr), io::Error> for HttpServer { addr: msg.1, stream: msg.0, reader: Reader::new(), + error: false, items: VecDeque::new(), inactive: Vec::new(), }); @@ -65,6 +66,7 @@ impl Handler<(TcpStream, net::SocketAddr), io::Error> for HttpServer { struct Entry { task: Task, + req: RequestInfo, eof: bool, error: bool, finished: bool, @@ -76,6 +78,7 @@ pub struct HttpChannel { addr: net::SocketAddr, stream: TcpStream, reader: Reader, + error: bool, items: VecDeque, inactive: Vec, } @@ -97,7 +100,13 @@ impl Future for HttpChannel { if self.items[idx].error { return Err(()) } - match self.items[idx].task.poll_io(&mut self.stream) { + + // this is anoying + let req: &RequestInfo = unsafe { + mem::transmute(&self.items[idx].req) + }; + match self.items[idx].task.poll_io(&mut self.stream, req) + { Ok(Async::Ready(val)) => { let mut item = self.items.pop_front().unwrap(); if !val { @@ -107,7 +116,11 @@ impl Future for HttpChannel { continue }, Ok(Async::NotReady) => (), - Err(_) => return Err(()), + Err(_) => { + // it is not possible to recover from error + // during task handling, so just drop connection + return Err(()) + } } } else if !self.items[idx].finished { match self.items[idx].task.poll() { @@ -121,19 +134,32 @@ impl Future for HttpChannel { idx += 1; } + // check for parse error + if self.items.is_empty() && self.error { + + } + // read incoming data - match self.reader.parse(&mut self.stream) { - Ok(Async::Ready((req, payload))) => { - self.items.push_back( - Entry {task: self.router.call(req, payload), - eof: false, - error: false, - finished: false}); - }, - Ok(Async::NotReady) => - return Ok(Async::NotReady), - Err(_) => - return Err(()), + if !self.error { + match self.reader.parse(&mut self.stream) { + Ok(Async::Ready((req, payload))) => { + let info = RequestInfo::new(&req); + self.items.push_back( + Entry {task: self.router.call(req, payload), + req: info, + eof: false, + error: false, + finished: false}); + } + Ok(Async::NotReady) => + return Ok(Async::NotReady), + Err(err) => return Err(()) + //self.items.push_back( + // Entry {task: Task::reply(err), + // eof: false, + // error: false, + // finished: false}) + } } } } diff --git a/src/task.rs b/src/task.rs index 30ecd4f5..846f01da 100644 --- a/src/task.rs +++ b/src/task.rs @@ -44,6 +44,20 @@ impl TaskIOState { } } +pub(crate) struct RequestInfo { + version: Version, + keep_alive: bool, +} + +impl RequestInfo { + pub fn new(req: &HttpRequest) -> Self { + RequestInfo { + version: req.version(), + keep_alive: req.keep_alive(), + } + } +} + pub struct Task { state: TaskRunningState, iostate: TaskIOState, @@ -56,9 +70,9 @@ pub struct Task { impl Task { - pub fn reply>(req: HttpRequest, response: R) -> Self { + pub fn reply>(response: R) -> Self { let mut frames = VecDeque::new(); - frames.push_back(Frame::Message(req, response.into())); + frames.push_back(Frame::Message(response.into())); frames.push_back(Frame::Payload(None)); Task { @@ -86,13 +100,13 @@ impl Task { } } - fn prepare(&mut self, req: HttpRequest, mut msg: HttpResponse) + fn prepare(&mut self, req: &RequestInfo, mut msg: HttpResponse) { trace!("Prepare message status={:?}", msg.status); let mut extra = 0; let body = msg.replace_body(Body::Empty); - let version = msg.version().unwrap_or_else(|| req.version()); + let version = msg.version().unwrap_or_else(|| req.version); match body { Body::Empty => { @@ -124,7 +138,7 @@ impl Task { Body::Streaming => { if msg.chunked() { if version < Version::HTTP_11 { - error!("Chunked transfer encoding is forbidden for {:?}", msg.version); + error!("Chunked transfer encoding is forbidden for {:?}", version); } msg.headers.remove(CONTENT_LENGTH); msg.headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); @@ -144,7 +158,7 @@ impl Task { msg.headers.insert(CONNECTION, HeaderValue::from_static("upgrade")); } // keep-alive - else if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) { + else if msg.keep_alive().unwrap_or_else(|| req.keep_alive) { if version < Version::HTTP_11 { msg.headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); } @@ -159,7 +173,7 @@ impl Task { if version == Version::HTTP_11 && msg.status == StatusCode::OK { self.buffer.extend(b"HTTP/1.1 200 OK\r\n"); } else { - let _ = write!(self.buffer, "{:?} {}\r\n", msg.version, msg.status); + let _ = write!(self.buffer, "{:?} {}\r\n", version, msg.status); } for (key, value) in &msg.headers { let t: &[u8] = key.as_ref(); @@ -192,7 +206,7 @@ impl Task { msg.replace_body(body); } - pub(crate) fn poll_io(&mut self, io: &mut TcpStream) -> Poll { + pub(crate) fn poll_io(&mut self, io: &mut TcpStream, info: &RequestInfo) -> Poll { trace!("POLL-IO frames:{:?}", self.frames.len()); // response is completed if self.frames.is_empty() && self.iostate.is_done() { @@ -213,8 +227,8 @@ impl Task { while let Some(frame) = self.frames.pop_front() { trace!("IO Frame: {:?}", frame); match frame { - Frame::Message(request, response) => { - self.prepare(request, response); + Frame::Message(response) => { + self.prepare(info, response); } Frame::Payload(chunk) => { match chunk { @@ -277,7 +291,7 @@ impl Future for Task { match stream.poll() { Ok(Async::Ready(Some(frame))) => { match frame { - Frame::Message(_, ref msg) => { + Frame::Message(ref msg) => { if self.iostate != TaskIOState::ReadingMessage { error!("Non expected frame {:?}", frame); return Err(())