mirror of
https://github.com/actix/actix-extras.git
synced 2025-06-26 18:37:41 +02:00
complete impl for client request and response
This commit is contained in:
@ -21,12 +21,11 @@ use server::shared::SharedBytes;
|
||||
|
||||
use server::{utils, IoStream};
|
||||
use client::{ClientRequest, ClientRequestBuilder,
|
||||
HttpResponseParser, HttpResponseParserError};
|
||||
HttpResponseParser, HttpResponseParserError, HttpClientWriter};
|
||||
|
||||
use super::Message;
|
||||
use super::proto::{CloseCode, OpCode};
|
||||
use super::frame::Frame;
|
||||
use super::writer::Writer;
|
||||
use super::connect::{TcpConnector, TcpConnectorError};
|
||||
|
||||
/// Websockt client error
|
||||
@ -86,7 +85,7 @@ impl From<HttpResponseParserError> for WsClientError {
|
||||
}
|
||||
}
|
||||
|
||||
type WsFuture<T> = Future<Item=(WsReader<T>, WsWriter<T>), Error=WsClientError>;
|
||||
pub type WsFuture<T> = Future<Item=(WsReader<T>, WsWriter<T>), Error=WsClientError>;
|
||||
|
||||
/// Websockt client
|
||||
pub struct WsClient {
|
||||
@ -190,7 +189,7 @@ impl WsClient {
|
||||
|
||||
struct WsInner<T> {
|
||||
stream: T,
|
||||
writer: Writer,
|
||||
writer: HttpClientWriter,
|
||||
parser: HttpResponseParser,
|
||||
parser_buf: BytesMut,
|
||||
closed: bool,
|
||||
@ -218,7 +217,7 @@ impl<T: IoStream> WsHandshake<T> {
|
||||
|
||||
let inner = WsInner {
|
||||
stream: stream,
|
||||
writer: Writer::new(SharedBytes::default()),
|
||||
writer: HttpClientWriter::new(SharedBytes::default()),
|
||||
parser: HttpResponseParser::default(),
|
||||
parser_buf: BytesMut::new(),
|
||||
closed: false,
|
||||
|
@ -59,18 +59,15 @@ mod frame;
|
||||
mod proto;
|
||||
mod context;
|
||||
mod mask;
|
||||
mod client;
|
||||
|
||||
mod connect;
|
||||
mod writer;
|
||||
|
||||
pub mod client;
|
||||
|
||||
use ws::frame::Frame;
|
||||
use ws::proto::{hash_key, OpCode};
|
||||
pub use ws::proto::CloseCode;
|
||||
pub use ws::context::WebsocketContext;
|
||||
|
||||
pub use self::client::{WsClient, WsClientError, WsReader, WsWriter};
|
||||
use self::frame::Frame;
|
||||
use self::proto::{hash_key, OpCode};
|
||||
pub use self::proto::CloseCode;
|
||||
pub use self::context::WebsocketContext;
|
||||
pub use self::client::{WsClient, WsClientError, WsReader, WsWriter, WsFuture};
|
||||
|
||||
const SEC_WEBSOCKET_ACCEPT: &str = "SEC-WEBSOCKET-ACCEPT";
|
||||
const SEC_WEBSOCKET_KEY: &str = "SEC-WEBSOCKET-KEY";
|
||||
|
153
src/ws/writer.rs
153
src/ws/writer.rs
@ -1,153 +0,0 @@
|
||||
#![allow(dead_code)]
|
||||
use std::io;
|
||||
use bytes::BufMut;
|
||||
use futures::{Async, Poll};
|
||||
use tokio_io::AsyncWrite;
|
||||
// use http::header::{HeaderValue, CONNECTION, DATE};
|
||||
|
||||
use body::Binary;
|
||||
use server::{WriterState, MAX_WRITE_BUFFER_SIZE};
|
||||
use server::shared::SharedBytes;
|
||||
|
||||
use client::ClientRequest;
|
||||
|
||||
|
||||
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
|
||||
|
||||
bitflags! {
|
||||
struct Flags: u8 {
|
||||
const STARTED = 0b0000_0001;
|
||||
const UPGRADE = 0b0000_0010;
|
||||
const KEEPALIVE = 0b0000_0100;
|
||||
const DISCONNECTED = 0b0000_1000;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Writer {
|
||||
flags: Flags,
|
||||
written: u64,
|
||||
headers_size: u32,
|
||||
buffer: SharedBytes,
|
||||
}
|
||||
|
||||
impl Writer {
|
||||
|
||||
pub fn new(buf: SharedBytes) -> Writer {
|
||||
Writer {
|
||||
flags: Flags::empty(),
|
||||
written: 0,
|
||||
headers_size: 0,
|
||||
buffer: buf,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disconnected(&mut self) {
|
||||
self.buffer.take();
|
||||
}
|
||||
|
||||
pub fn keepalive(&self) -> bool {
|
||||
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
|
||||
}
|
||||
|
||||
fn write_to_stream<T: AsyncWrite>(&mut self, stream: &mut T) -> io::Result<WriterState> {
|
||||
while !self.buffer.is_empty() {
|
||||
match stream.write(self.buffer.as_ref()) {
|
||||
Ok(0) => {
|
||||
self.disconnected();
|
||||
return Ok(WriterState::Done);
|
||||
},
|
||||
Ok(n) => {
|
||||
let _ = self.buffer.split_to(n);
|
||||
},
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
return Ok(WriterState::Pause)
|
||||
} else {
|
||||
return Ok(WriterState::Done)
|
||||
}
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
}
|
||||
Ok(WriterState::Done)
|
||||
}
|
||||
}
|
||||
|
||||
impl Writer {
|
||||
|
||||
pub fn start(&mut self, msg: &mut ClientRequest) {
|
||||
// prepare task
|
||||
self.flags.insert(Flags::STARTED);
|
||||
|
||||
// render message
|
||||
{
|
||||
let buffer = self.buffer.get_mut();
|
||||
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
|
||||
|
||||
// status line
|
||||
// helpers::write_status_line(version, msg.status().as_u16(), &mut buffer);
|
||||
// buffer.extend_from_slice(msg.reason().as_bytes());
|
||||
buffer.extend_from_slice(b"GET ");
|
||||
buffer.extend_from_slice(msg.uri().path().as_ref());
|
||||
buffer.extend_from_slice(b" HTTP/1.1\r\n");
|
||||
|
||||
// write headers
|
||||
for (key, value) in msg.headers() {
|
||||
let v = value.as_ref();
|
||||
let k = key.as_str().as_bytes();
|
||||
buffer.reserve(k.len() + v.len() + 4);
|
||||
buffer.put_slice(k);
|
||||
buffer.put_slice(b": ");
|
||||
buffer.put_slice(v);
|
||||
buffer.put_slice(b"\r\n");
|
||||
}
|
||||
|
||||
// using helpers::date is quite a lot faster
|
||||
//if !msg.headers.contains_key(DATE) {
|
||||
// helpers::date(&mut buffer);
|
||||
//} else {
|
||||
// msg eof
|
||||
buffer.extend_from_slice(b"\r\n");
|
||||
//}
|
||||
self.headers_size = buffer.len() as u32;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write(&mut self, payload: &Binary) -> io::Result<WriterState> {
|
||||
self.written += payload.len() as u64;
|
||||
if !self.flags.contains(Flags::DISCONNECTED) {
|
||||
self.buffer.extend_from_slice(payload.as_ref())
|
||||
}
|
||||
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_eof(&mut self) -> io::Result<WriterState> {
|
||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||
Ok(WriterState::Pause)
|
||||
} else {
|
||||
Ok(WriterState::Done)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn poll_completed<T: AsyncWrite>(&mut self, stream: &mut T, shutdown: bool)
|
||||
-> Poll<(), io::Error>
|
||||
{
|
||||
match self.write_to_stream(stream) {
|
||||
Ok(WriterState::Done) => {
|
||||
if shutdown {
|
||||
stream.shutdown()
|
||||
} else {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
},
|
||||
Ok(WriterState::Pause) => Ok(Async::NotReady),
|
||||
Err(err) => Err(err)
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user