1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-12-01 02:54:36 +01:00
actix-web/actix-http/src/h1/payload.rs

265 lines
6.4 KiB
Rust
Raw Normal View History

2017-12-19 09:18:57 +01:00
//! Payload stream
2018-04-14 01:02:01 +02:00
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::{Rc, Weak};
2017-10-09 05:16:48 +02:00
2019-04-03 12:20:20 +02:00
use bytes::Bytes;
2019-03-27 05:54:57 +01:00
use futures::task::current as current_task;
use futures::task::Task;
use futures::{Async, Poll, Stream};
2018-12-06 23:32:52 +01:00
use crate::error::PayloadError;
2017-10-27 08:14:33 +02:00
/// max buffer size 32k
pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;
#[derive(Debug, PartialEq)]
pub enum PayloadStatus {
Read,
Pause,
Dropped,
}
2017-12-19 09:29:25 +01:00
/// Buffered stream of bytes chunks
2017-10-09 05:16:48 +02:00
///
2018-04-14 01:02:01 +02:00
/// Payload stores chunks in a vector. First chunk can be received with
/// `.readany()` method. Payload stream is not thread safe. Payload does not
/// notify current task when new data is available.
2017-12-19 09:29:25 +01:00
///
2018-10-05 20:04:59 +02:00
/// Payload stream can be used as `Response` body stream.
2017-10-19 08:43:50 +02:00
#[derive(Debug)]
2017-10-09 05:16:48 +02:00
pub struct Payload {
inner: Rc<RefCell<Inner>>,
}
impl Payload {
2017-12-19 06:58:38 +01:00
/// Create payload stream.
///
2018-04-14 01:02:01 +02:00
/// This method construct two objects responsible for bytes stream
/// generation.
2017-12-19 06:58:38 +01:00
///
/// * `PayloadSender` - *Sender* side of the stream
///
/// * `Payload` - *Receiver* side of the stream
2019-01-29 19:14:00 +01:00
pub fn create(eof: bool) -> (PayloadSender, Payload) {
2017-10-09 05:16:48 +02:00
let shared = Rc::new(RefCell::new(Inner::new(eof)));
2018-04-14 01:02:01 +02:00
(
PayloadSender {
inner: Rc::downgrade(&shared),
},
2018-04-29 18:09:08 +02:00
Payload { inner: shared },
2018-04-14 01:02:01 +02:00
)
2017-10-09 05:16:48 +02:00
}
2017-11-27 04:00:57 +01:00
/// Create empty payload
#[doc(hidden)]
pub fn empty() -> Payload {
2018-04-14 01:02:01 +02:00
Payload {
inner: Rc::new(RefCell::new(Inner::new(true))),
}
2017-11-27 04:00:57 +01:00
}
2017-10-09 05:16:48 +02:00
/// Length of the data in this payload
2018-06-25 06:58:04 +02:00
#[cfg(test)]
2017-10-09 05:16:48 +02:00
pub fn len(&self) -> usize {
self.inner.borrow().len()
}
/// Is payload empty
2018-06-25 06:58:04 +02:00
#[cfg(test)]
2017-10-09 05:16:48 +02:00
pub fn is_empty(&self) -> bool {
self.inner.borrow().len() == 0
}
/// Put unused data back to payload
2018-01-11 06:02:28 +01:00
#[inline]
2017-10-14 01:33:23 +02:00
pub fn unread_data(&mut self, data: Bytes) {
2017-10-09 05:16:48 +02:00
self.inner.borrow_mut().unread_data(data);
}
2017-11-04 17:07:44 +01:00
#[inline]
/// Set read buffer capacity
///
/// Default buffer capacity is 32Kb.
pub fn set_read_buffer_capacity(&mut self, cap: usize) {
self.inner.borrow_mut().capacity = cap;
}
2017-10-09 05:16:48 +02:00
}
impl Stream for Payload {
2018-02-25 09:21:45 +01:00
type Item = Bytes;
2017-10-27 08:14:33 +02:00
type Error = PayloadError;
2017-10-09 05:16:48 +02:00
2018-01-11 06:02:28 +01:00
#[inline]
2018-02-25 09:21:45 +01:00
fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
2018-02-27 05:07:22 +01:00
self.inner.borrow_mut().readany()
2017-12-19 09:18:57 +01:00
}
}
2017-12-19 06:58:38 +01:00
/// Sender part of the payload stream
pub struct PayloadSender {
2017-10-09 05:16:48 +02:00
inner: Weak<RefCell<Inner>>,
}
2019-04-07 19:29:26 +02:00
impl PayloadSender {
2018-03-09 02:19:50 +01:00
#[inline]
2019-04-07 19:29:26 +02:00
pub fn set_error(&mut self, err: PayloadError) {
2017-10-14 01:33:23 +02:00
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().set_error(err)
}
}
2018-03-09 02:19:50 +01:00
#[inline]
2019-04-07 19:29:26 +02:00
pub fn feed_eof(&mut self) {
2017-10-09 05:16:48 +02:00
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_eof()
}
}
2018-03-09 02:19:50 +01:00
#[inline]
2019-04-07 19:29:26 +02:00
pub fn feed_data(&mut self, data: Bytes) {
2017-10-09 05:16:48 +02:00
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_data(data)
}
}
2018-02-10 01:20:10 +01:00
#[inline]
2019-04-07 19:29:26 +02:00
pub fn need_read(&self) -> PayloadStatus {
// we check need_read only if Payload (other side) is alive,
// otherwise always return true (consume payload)
2017-11-06 10:24:49 +01:00
if let Some(shared) = self.inner.upgrade() {
if shared.borrow().need_read {
PayloadStatus::Read
} else {
#[cfg(not(test))]
{
if shared.borrow_mut().io_task.is_none() {
shared.borrow_mut().io_task = Some(current_task());
}
}
PayloadStatus::Pause
}
2017-11-06 10:24:49 +01:00
} else {
PayloadStatus::Dropped
2017-11-06 10:24:49 +01:00
}
}
}
2017-10-19 08:43:50 +02:00
#[derive(Debug)]
2017-10-09 05:16:48 +02:00
struct Inner {
len: usize,
eof: bool,
2017-10-14 01:33:23 +02:00
err: Option<PayloadError>,
2018-02-27 05:07:22 +01:00
need_read: bool,
2017-10-09 05:16:48 +02:00
items: VecDeque<Bytes>,
capacity: usize,
task: Option<Task>,
io_task: Option<Task>,
2017-10-09 05:16:48 +02:00
}
impl Inner {
fn new(eof: bool) -> Self {
Inner {
2018-02-26 23:33:56 +01:00
eof,
2017-10-09 05:16:48 +02:00
len: 0,
2017-10-14 01:33:23 +02:00
err: None,
2017-10-09 05:16:48 +02:00
items: VecDeque::new(),
2018-03-03 05:47:23 +01:00
need_read: true,
capacity: MAX_BUFFER_SIZE,
task: None,
io_task: None,
2017-10-09 05:16:48 +02:00
}
}
2018-03-09 02:19:50 +01:00
#[inline]
2017-10-14 01:33:23 +02:00
fn set_error(&mut self, err: PayloadError) {
self.err = Some(err);
}
2018-03-09 02:19:50 +01:00
#[inline]
2017-10-09 05:16:48 +02:00
fn feed_eof(&mut self) {
self.eof = true;
}
2018-03-09 02:19:50 +01:00
#[inline]
2017-10-09 05:16:48 +02:00
fn feed_data(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_back(data);
self.need_read = self.len < self.capacity;
if let Some(task) = self.task.take() {
task.notify()
}
2017-10-09 05:16:48 +02:00
}
2018-06-25 06:58:04 +02:00
#[cfg(test)]
2017-10-09 05:16:48 +02:00
fn len(&self) -> usize {
self.len
}
2018-02-27 05:07:22 +01:00
fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
2017-10-09 05:16:48 +02:00
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
self.need_read = self.len < self.capacity;
2019-03-27 05:54:57 +01:00
if self.need_read && self.task.is_none() && !self.eof {
self.task = Some(current_task());
}
if let Some(task) = self.io_task.take() {
task.notify()
}
2018-02-25 09:21:45 +01:00
Ok(Async::Ready(Some(data)))
2017-10-14 01:33:23 +02:00
} else if let Some(err) = self.err.take() {
2017-10-27 08:14:33 +02:00
Err(err)
2018-02-10 01:20:10 +01:00
} else if self.eof {
Ok(Async::Ready(None))
2017-10-09 05:16:48 +02:00
} else {
2018-02-27 05:07:22 +01:00
self.need_read = true;
#[cfg(not(test))]
{
if self.task.is_none() {
self.task = Some(current_task());
}
if let Some(task) = self.io_task.take() {
task.notify()
}
}
2017-10-27 08:14:33 +02:00
Ok(Async::NotReady)
2017-10-09 05:16:48 +02:00
}
}
2017-11-04 17:07:44 +01:00
fn unread_data(&mut self, data: Bytes) {
2017-10-09 05:16:48 +02:00
self.len += data.len();
2017-12-21 05:30:54 +01:00
self.items.push_front(data);
2017-10-09 05:16:48 +02:00
}
}
2017-10-23 04:58:50 +02:00
#[cfg(test)]
2017-10-23 06:40:41 +02:00
mod tests {
2017-10-23 04:58:50 +02:00
use super::*;
2018-12-11 03:08:33 +01:00
use actix_rt::Runtime;
2017-10-23 04:58:50 +02:00
use futures::future::{lazy, result};
2017-10-23 05:19:20 +02:00
#[test]
fn test_unread_data() {
2018-05-25 06:03:16 +02:00
Runtime::new()
2018-04-14 01:02:01 +02:00
.unwrap()
2018-05-25 06:03:16 +02:00
.block_on(lazy(|| {
2019-01-29 19:14:00 +01:00
let (_, mut payload) = Payload::create(false);
2018-04-14 01:02:01 +02:00
payload.unread_data(Bytes::from("data"));
assert!(!payload.is_empty());
assert_eq!(payload.len(), 4);
assert_eq!(
Async::Ready(Some(Bytes::from("data"))),
payload.poll().ok().unwrap()
);
let res: Result<(), ()> = Ok(());
result(res)
2018-12-06 23:32:52 +01:00
}))
.unwrap();
2017-10-23 05:19:20 +02:00
}
2017-10-23 04:58:50 +02:00
}