1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

remove unused code

This commit is contained in:
Nikolay Kim 2018-02-26 16:11:00 -08:00
parent d6fd4a3524
commit abae65a49e
2 changed files with 68 additions and 224 deletions

View File

@ -12,9 +12,11 @@ use futures::Poll;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
use openssl::ssl::{SslMethod, SslConnector, SslVerifyMode, Error as OpensslError}; use openssl::ssl::{SslMethod, SslConnector, Error as OpensslError};
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
use tokio_openssl::SslConnectorExt; use tokio_openssl::SslConnectorExt;
#[cfg(feature="alpn")]
use futures::Future;
use HAS_OPENSSL; use HAS_OPENSSL;
use server::IoStream; use server::IoStream;
@ -92,7 +94,7 @@ impl Default for ClientConnector {
fn default() -> ClientConnector { fn default() -> ClientConnector {
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
{ {
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); let builder = SslConnector::builder(SslMethod::tls()).unwrap();
ClientConnector { ClientConnector {
connector: builder.build() connector: builder.build()
} }
@ -149,9 +151,7 @@ impl ClientConnector {
/// } /// }
/// ``` /// ```
pub fn with_connector(connector: SslConnector) -> ClientConnector { pub fn with_connector(connector: SslConnector) -> ClientConnector {
ClientConnector { ClientConnector { connector }
connector: connector
}
} }
} }
@ -196,7 +196,7 @@ impl Handler<Connect> for ClientConnector {
if proto.is_secure() { if proto.is_secure() {
fut::Either::A( fut::Either::A(
_act.connector.connect_async(&host, stream) _act.connector.connect_async(&host, stream)
.map_err(|e| ClientConnectorError::SslError(e)) .map_err(ClientConnectorError::SslError)
.map(|stream| Connection{stream: Box::new(stream)}) .map(|stream| Connection{stream: Box::new(stream)})
.into_actor(_act)) .into_actor(_act))
} else { } else {

View File

@ -4,7 +4,7 @@ use std::rc::{Rc, Weak};
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Future, Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use futures::task::{Task, current as current_task}; use futures::task::{Task, current as current_task};
use error::PayloadError; use error::PayloadError;
@ -62,30 +62,6 @@ impl Payload {
self.inner.borrow().len() == 0 self.inner.borrow().len() == 0
} }
/// Get exact number of bytes
#[inline]
pub fn readexactly(&self, size: usize) -> ReadExactly {
ReadExactly(Rc::clone(&self.inner), size)
}
/// Read until `\n`
#[inline]
pub fn readline(&self) -> ReadLine {
ReadLine(Rc::clone(&self.inner))
}
/// Read until match line
#[inline]
pub fn readuntil(&self, line: &[u8]) -> ReadUntil {
ReadUntil(Rc::clone(&self.inner), line.to_vec())
}
#[doc(hidden)]
#[inline]
pub fn readall(&self) -> Option<Bytes> {
self.inner.borrow_mut().readall()
}
/// Put unused data back to payload /// Put unused data back to payload
#[inline] #[inline]
pub fn unread_data(&mut self, data: Bytes) { pub fn unread_data(&mut self, data: Bytes) {
@ -103,6 +79,11 @@ impl Payload {
pub fn set_buffer_size(&self, size: usize) { pub fn set_buffer_size(&self, size: usize) {
self.inner.borrow_mut().set_buffer_size(size) self.inner.borrow_mut().set_buffer_size(size)
} }
#[cfg(test)]
pub(crate) fn readall(&self) -> Option<Bytes> {
self.inner.borrow_mut().readall()
}
} }
impl Stream for Payload { impl Stream for Payload {
@ -121,51 +102,6 @@ impl Clone for Payload {
} }
} }
/// Get exact number of bytes
pub struct ReadExactly(Rc<RefCell<Inner>>, usize);
impl Future for ReadExactly {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.borrow_mut().readexactly(self.1, false)? {
Async::Ready(chunk) => Ok(Async::Ready(chunk)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Read until `\n`
pub struct ReadLine(Rc<RefCell<Inner>>);
impl Future for ReadLine {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.borrow_mut().readline(false)? {
Async::Ready(chunk) => Ok(Async::Ready(chunk)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Read until match line
pub struct ReadUntil(Rc<RefCell<Inner>>, Vec<u8>);
impl Future for ReadUntil {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.borrow_mut().readuntil(&self.1, false)? {
Async::Ready(chunk) => Ok(Async::Ready(chunk)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Payload writer interface. /// Payload writer interface.
pub trait PayloadWriter { pub trait PayloadWriter {
@ -271,6 +207,22 @@ impl Inner {
self.len self.len
} }
#[cfg(test)]
pub(crate) fn readall(&mut self) -> Option<Bytes> {
let len = self.items.iter().map(|b| b.len()).sum();
if len > 0 {
let mut buf = BytesMut::with_capacity(len);
for item in &self.items {
buf.extend_from_slice(item);
}
self.items = VecDeque::new();
self.len = 0;
Some(buf.take().freeze())
} else {
None
}
}
fn readany(&mut self, notify: bool) -> Poll<Option<Bytes>, PayloadError> { fn readany(&mut self, notify: bool) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() { if let Some(data) = self.items.pop_front() {
self.len -= data.len(); self.len -= data.len();
@ -287,107 +239,6 @@ impl Inner {
} }
} }
fn readexactly(&mut self, size: usize, notify: bool) -> Result<Async<Bytes>, PayloadError> {
if size <= self.len {
let mut buf = BytesMut::with_capacity(size);
while buf.len() < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
self.len -= rem;
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
self.items.push_front(chunk);
}
}
return Ok(Async::Ready(buf.freeze()))
}
if let Some(err) = self.err.take() {
Err(err)
} else {
if notify {
self.task = Some(current_task());
}
Ok(Async::NotReady)
}
}
fn readuntil(&mut self, line: &[u8], notify: bool) -> Result<Async<Bytes>, PayloadError> {
let mut idx = 0;
let mut num = 0;
let mut offset = 0;
let mut found = false;
let mut length = 0;
for no in 0..self.items.len() {
{
let chunk = &self.items[no];
for (pos, ch) in chunk.iter().enumerate() {
if *ch == line[idx] {
idx += 1;
if idx == line.len() {
num = no;
offset = pos+1;
length += pos+1;
found = true;
break;
}
} else {
idx = 0
}
}
if !found {
length += chunk.len()
}
}
if found {
let mut buf = BytesMut::with_capacity(length);
if num > 0 {
for _ in 0..num {
buf.extend_from_slice(&self.items.pop_front().unwrap());
}
}
if offset > 0 {
let mut chunk = self.items.pop_front().unwrap();
buf.extend_from_slice(&chunk.split_to(offset));
if !chunk.is_empty() {
self.items.push_front(chunk)
}
}
self.len -= length;
return Ok(Async::Ready(buf.freeze()))
}
}
if let Some(err) = self.err.take() {
Err(err)
} else {
if notify {
self.task = Some(current_task());
}
Ok(Async::NotReady)
}
}
fn readline(&mut self, notify: bool) -> Result<Async<Bytes>, PayloadError> {
self.readuntil(b"\n", notify)
}
pub fn readall(&mut self) -> Option<Bytes> {
let len = self.items.iter().map(|b| b.len()).sum();
if len > 0 {
let mut buf = BytesMut::with_capacity(len);
for item in &self.items {
buf.extend_from_slice(item);
}
self.items = VecDeque::new();
self.len = 0;
Some(buf.take().freeze())
} else {
None
}
}
fn unread_data(&mut self, data: Bytes) { fn unread_data(&mut self, data: Bytes) {
self.len += data.len(); self.len += data.len();
self.items.push_front(data); self.items.push_front(data);
@ -592,12 +443,11 @@ mod tests {
#[test] #[test]
fn test_basic() { fn test_basic() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (_, mut payload) = Payload::new(false); let (_, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
assert!(!payload.eof()); assert_eq!(payload.len, 0);
assert!(payload.is_empty()); assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
assert_eq!(payload.len(), 0);
assert_eq!(Async::NotReady, payload.poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -607,23 +457,18 @@ mod tests {
#[test] #[test]
fn test_eof() { fn test_eof() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
assert_eq!(Async::NotReady, payload.poll().ok().unwrap());
assert!(!payload.eof());
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
sender.feed_data(Bytes::from("data")); sender.feed_data(Bytes::from("data"));
sender.feed_eof(); sender.feed_eof();
assert!(!payload.eof());
assert_eq!(Async::Ready(Some(Bytes::from("data"))), assert_eq!(Async::Ready(Some(Bytes::from("data"))),
payload.poll().ok().unwrap()); payload.readany().ok().unwrap());
assert!(payload.is_empty()); assert_eq!(payload.len, 0);
assert!(payload.eof()); assert_eq!(Async::Ready(None), payload.readany().ok().unwrap());
assert_eq!(payload.len(), 0);
assert_eq!(Async::Ready(None), payload.poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
})).unwrap(); })).unwrap();
@ -632,12 +477,13 @@ mod tests {
#[test] #[test]
fn test_err() { fn test_err() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
assert_eq!(Async::NotReady, payload.poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.poll().err().unwrap(); payload.readany().err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
})).unwrap(); })).unwrap();
@ -646,21 +492,19 @@ mod tests {
#[test] #[test]
fn test_readany() { fn test_readany() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
assert!(!payload.is_empty());
assert_eq!(payload.len(), 5);
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
assert!(!payload.is_empty());
assert_eq!(payload.len(), 10);
assert_eq!(Async::Ready(Some(Bytes::from("line1"))), assert_eq!(Async::Ready(Some(Bytes::from("line1"))),
payload.poll().ok().unwrap()); payload.readany().ok().unwrap());
assert!(!payload.is_empty()); assert_eq!(payload.len, 0);
assert_eq!(payload.len(), 5);
assert_eq!(Async::Ready(Some(Bytes::from("line2"))),
payload.readany().ok().unwrap());
assert_eq!(payload.len, 0);
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -671,23 +515,23 @@ mod tests {
fn test_readexactly() { fn test_readexactly() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.readexactly(2).ok().unwrap());
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
assert_eq!(payload.len(), 10);
assert_eq!(Async::Ready(Bytes::from("li")), assert_eq!(Async::Ready(Some(BytesMut::from("li"))),
payload.readexactly(2).poll().ok().unwrap()); payload.readexactly(2).ok().unwrap());
assert_eq!(payload.len(), 8); assert_eq!(payload.len, 3);
assert_eq!(Async::Ready(Bytes::from("ne1l")), assert_eq!(Async::Ready(Some(BytesMut::from("ne1l"))),
payload.readexactly(4).poll().ok().unwrap()); payload.readexactly(4).ok().unwrap());
assert_eq!(payload.len(), 4); assert_eq!(payload.len, 4);
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.readexactly(10).poll().err().unwrap(); payload.readexactly(10).err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -698,23 +542,23 @@ mod tests {
fn test_readuntil() { fn test_readuntil() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.readuntil(b"ne").ok().unwrap());
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
assert_eq!(payload.len(), 10);
assert_eq!(Async::Ready(Bytes::from("line")), assert_eq!(Async::Ready(Some(Bytes::from("line"))),
payload.readuntil(b"ne").poll().ok().unwrap()); payload.readuntil(b"ne").ok().unwrap());
assert_eq!(payload.len(), 6); assert_eq!(payload.len, 1);
assert_eq!(Async::Ready(Bytes::from("1line2")), assert_eq!(Async::Ready(Some(Bytes::from("1line2"))),
payload.readuntil(b"2").poll().ok().unwrap()); payload.readuntil(b"2").ok().unwrap());
assert_eq!(payload.len(), 0); assert_eq!(payload.len, 0);
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.readuntil(b"b").poll().err().unwrap(); payload.readuntil(b"b").err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)