mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-30 18:34:36 +01:00
non-blocking processing for NamedFile
This commit is contained in:
parent
af8875f6ab
commit
42d2a29b1d
@ -6,6 +6,8 @@
|
|||||||
|
|
||||||
* Fix client cookie handling #111
|
* Fix client cookie handling #111
|
||||||
|
|
||||||
|
* Non-blocking processing of a `NamedFile`
|
||||||
|
|
||||||
* Enable compression support for `NamedFile`
|
* Enable compression support for `NamedFile`
|
||||||
|
|
||||||
* Better support for `NamedFile` type
|
* Better support for `NamedFile` type
|
||||||
|
108
src/fs.rs
108
src/fs.rs
@ -1,8 +1,8 @@
|
|||||||
//! Static files support.
|
//! Static files support.
|
||||||
|
|
||||||
// //! TODO: needs to re-implement actual files handling, current impl blocks
|
// //! TODO: needs to re-implement actual files handling, current impl blocks
|
||||||
use std::io;
|
use std::{io, cmp};
|
||||||
use std::io::Read;
|
use std::io::{Read, Seek};
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
use std::fs::{File, DirEntry, Metadata};
|
use std::fs::{File, DirEntry, Metadata};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
@ -12,10 +12,14 @@ use std::time::{SystemTime, UNIX_EPOCH};
|
|||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
use std::os::unix::fs::MetadataExt;
|
use std::os::unix::fs::MetadataExt;
|
||||||
|
|
||||||
|
use bytes::{Bytes, BytesMut, BufMut};
|
||||||
use http::{Method, StatusCode};
|
use http::{Method, StatusCode};
|
||||||
|
use futures::{Async, Poll, Future, Stream};
|
||||||
|
use futures_cpupool::{CpuPool, CpuFuture};
|
||||||
use mime_guess::get_mime_type;
|
use mime_guess::get_mime_type;
|
||||||
|
|
||||||
use header;
|
use header;
|
||||||
|
use error::Error;
|
||||||
use param::FromParam;
|
use param::FromParam;
|
||||||
use handler::{Handler, Responder};
|
use handler::{Handler, Responder};
|
||||||
use httpmessage::HttpMessage;
|
use httpmessage::HttpMessage;
|
||||||
@ -31,6 +35,7 @@ pub struct NamedFile {
|
|||||||
file: File,
|
file: File,
|
||||||
md: Metadata,
|
md: Metadata,
|
||||||
modified: Option<SystemTime>,
|
modified: Option<SystemTime>,
|
||||||
|
cpu_pool: Option<CpuPool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NamedFile {
|
impl NamedFile {
|
||||||
@ -48,7 +53,8 @@ impl NamedFile {
|
|||||||
let md = file.metadata()?;
|
let md = file.metadata()?;
|
||||||
let path = path.as_ref().to_path_buf();
|
let path = path.as_ref().to_path_buf();
|
||||||
let modified = md.modified().ok();
|
let modified = md.modified().ok();
|
||||||
Ok(NamedFile{path, file, md, modified})
|
let cpu_pool = None;
|
||||||
|
Ok(NamedFile{path, file, md, modified, cpu_pool})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns reference to the underlying `File` object.
|
/// Returns reference to the underlying `File` object.
|
||||||
@ -76,6 +82,13 @@ impl NamedFile {
|
|||||||
self.path.as_path()
|
self.path.as_path()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns reference to the underlying `File` object.
|
||||||
|
#[inline]
|
||||||
|
pub fn set_cpu_pool(mut self, cpu_pool: CpuPool) -> Self {
|
||||||
|
self.cpu_pool = Some(cpu_pool);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
fn etag(&self) -> Option<header::EntityTag> {
|
fn etag(&self) -> Option<header::EntityTag> {
|
||||||
// This etag format is similar to Apache's.
|
// This etag format is similar to Apache's.
|
||||||
self.modified.as_ref().map(|mtime| {
|
self.modified.as_ref().map(|mtime| {
|
||||||
@ -117,8 +130,8 @@ impl DerefMut for NamedFile {
|
|||||||
/// Returns true if `req` has no `If-Match` header or one which matches `etag`.
|
/// Returns true if `req` has no `If-Match` header or one which matches `etag`.
|
||||||
fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool {
|
fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool {
|
||||||
match req.get_header::<header::IfMatch>() {
|
match req.get_header::<header::IfMatch>() {
|
||||||
Err(_) | Ok(header::IfMatch::Any) => true,
|
None | Some(header::IfMatch::Any) => true,
|
||||||
Ok(header::IfMatch::Items(ref items)) => {
|
Some(header::IfMatch::Items(ref items)) => {
|
||||||
if let Some(some_etag) = etag {
|
if let Some(some_etag) = etag {
|
||||||
for item in items {
|
for item in items {
|
||||||
if item.strong_eq(some_etag) {
|
if item.strong_eq(some_etag) {
|
||||||
@ -134,8 +147,8 @@ fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool {
|
|||||||
/// Returns true if `req` doesn't have an `If-None-Match` header matching `req`.
|
/// Returns true if `req` doesn't have an `If-None-Match` header matching `req`.
|
||||||
fn none_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool {
|
fn none_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool {
|
||||||
match req.get_header::<header::IfNoneMatch>() {
|
match req.get_header::<header::IfNoneMatch>() {
|
||||||
Ok(header::IfNoneMatch::Any) => false,
|
Some(header::IfNoneMatch::Any) => false,
|
||||||
Ok(header::IfNoneMatch::Items(ref items)) => {
|
Some(header::IfNoneMatch::Items(ref items)) => {
|
||||||
if let Some(some_etag) = etag {
|
if let Some(some_etag) = etag {
|
||||||
for item in items {
|
for item in items {
|
||||||
if item.weak_eq(some_etag) {
|
if item.weak_eq(some_etag) {
|
||||||
@ -145,7 +158,7 @@ fn none_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool {
|
|||||||
}
|
}
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
Err(_) => true,
|
None => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,7 +167,7 @@ impl Responder for NamedFile {
|
|||||||
type Item = HttpResponse;
|
type Item = HttpResponse;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn respond_to(mut self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
|
fn respond_to(self, req: HttpRequest) -> Result<HttpResponse, io::Error> {
|
||||||
if *req.method() != Method::GET && *req.method() != Method::HEAD {
|
if *req.method() != Method::GET && *req.method() != Method::HEAD {
|
||||||
return Ok(HttpMethodNotAllowed.build()
|
return Ok(HttpMethodNotAllowed.build()
|
||||||
.header(header::http::CONTENT_TYPE, "text/plain")
|
.header(header::http::CONTENT_TYPE, "text/plain")
|
||||||
@ -168,7 +181,7 @@ impl Responder for NamedFile {
|
|||||||
// check preconditions
|
// check preconditions
|
||||||
let precondition_failed = if !any_match(etag.as_ref(), &req) {
|
let precondition_failed = if !any_match(etag.as_ref(), &req) {
|
||||||
true
|
true
|
||||||
} else if let (Some(ref m), Ok(header::IfUnmodifiedSince(ref since))) =
|
} else if let (Some(ref m), Some(header::IfUnmodifiedSince(ref since))) =
|
||||||
(last_modified, req.get_header())
|
(last_modified, req.get_header())
|
||||||
{
|
{
|
||||||
m > since
|
m > since
|
||||||
@ -179,7 +192,7 @@ impl Responder for NamedFile {
|
|||||||
// check last modified
|
// check last modified
|
||||||
let not_modified = if !none_match(etag.as_ref(), &req) {
|
let not_modified = if !none_match(etag.as_ref(), &req) {
|
||||||
true
|
true
|
||||||
} else if let (Some(ref m), Ok(header::IfModifiedSince(ref since))) =
|
} else if let (Some(ref m), Some(header::IfModifiedSince(ref since))) =
|
||||||
(last_modified, req.get_header())
|
(last_modified, req.get_header())
|
||||||
{
|
{
|
||||||
m <= since
|
m <= since
|
||||||
@ -202,18 +215,71 @@ impl Responder for NamedFile {
|
|||||||
return Ok(resp.status(StatusCode::NOT_MODIFIED).finish().unwrap())
|
return Ok(resp.status(StatusCode::NOT_MODIFIED).finish().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.content_length(self.md.len());
|
|
||||||
|
|
||||||
if *req.method() == Method::GET {
|
if *req.method() == Method::GET {
|
||||||
let mut data = Vec::new();
|
let reader = ChunkedReadFile {
|
||||||
let _ = self.file.read_to_end(&mut data);
|
size: self.md.len(),
|
||||||
Ok(resp.body(data).unwrap())
|
offset: 0,
|
||||||
|
cpu_pool: self.cpu_pool.unwrap_or_else(|| req.cpu_pool().clone()),
|
||||||
|
file: Some(self.file),
|
||||||
|
fut: None,
|
||||||
|
};
|
||||||
|
Ok(resp.streaming(reader).unwrap())
|
||||||
} else {
|
} else {
|
||||||
Ok(resp.finish().unwrap())
|
Ok(resp.finish().unwrap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A helper created from a `std::fs::File` which reads the file
|
||||||
|
/// chunk-by-chunk on a `CpuPool`.
|
||||||
|
pub struct ChunkedReadFile {
|
||||||
|
size: u64,
|
||||||
|
offset: u64,
|
||||||
|
cpu_pool: CpuPool,
|
||||||
|
file: Option<File>,
|
||||||
|
fut: Option<CpuFuture<(File, Bytes), io::Error>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for ChunkedReadFile {
|
||||||
|
type Item = Bytes;
|
||||||
|
type Error= Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<Bytes>, Error> {
|
||||||
|
if self.fut.is_some() {
|
||||||
|
return match self.fut.as_mut().unwrap().poll()? {
|
||||||
|
Async::Ready((file, bytes)) => {
|
||||||
|
self.fut.take();
|
||||||
|
self.file = Some(file);
|
||||||
|
self.offset += bytes.len() as u64;
|
||||||
|
Ok(Async::Ready(Some(bytes)))
|
||||||
|
},
|
||||||
|
Async::NotReady => Ok(Async::NotReady),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let size = self.size;
|
||||||
|
let offset = self.offset;
|
||||||
|
|
||||||
|
if size == offset {
|
||||||
|
Ok(Async::Ready(None))
|
||||||
|
} else {
|
||||||
|
let mut file = self.file.take().expect("Use after completion");
|
||||||
|
self.fut = Some(self.cpu_pool.spawn_fn(move || {
|
||||||
|
let max_bytes = cmp::min(size.saturating_sub(offset), 65_536) as usize;
|
||||||
|
let mut buf = BytesMut::with_capacity(max_bytes);
|
||||||
|
file.seek(io::SeekFrom::Start(offset))?;
|
||||||
|
let nbytes = file.read(unsafe{buf.bytes_mut()})?;
|
||||||
|
if nbytes == 0 {
|
||||||
|
return Err(io::ErrorKind::UnexpectedEof.into())
|
||||||
|
}
|
||||||
|
unsafe{buf.advance_mut(nbytes)};
|
||||||
|
Ok((file, buf.freeze()))
|
||||||
|
}));
|
||||||
|
self.poll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A directory; responds with the generated directory listing.
|
/// A directory; responds with the generated directory listing.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Directory{
|
pub struct Directory{
|
||||||
@ -329,6 +395,7 @@ pub struct StaticFiles {
|
|||||||
accessible: bool,
|
accessible: bool,
|
||||||
index: Option<String>,
|
index: Option<String>,
|
||||||
show_index: bool,
|
show_index: bool,
|
||||||
|
cpu_pool: CpuPool,
|
||||||
_chunk_size: usize,
|
_chunk_size: usize,
|
||||||
_follow_symlinks: bool,
|
_follow_symlinks: bool,
|
||||||
}
|
}
|
||||||
@ -362,6 +429,7 @@ impl StaticFiles {
|
|||||||
accessible: access,
|
accessible: access,
|
||||||
index: None,
|
index: None,
|
||||||
show_index: index,
|
show_index: index,
|
||||||
|
cpu_pool: CpuPool::new(40),
|
||||||
_chunk_size: 0,
|
_chunk_size: 0,
|
||||||
_follow_symlinks: false,
|
_follow_symlinks: false,
|
||||||
}
|
}
|
||||||
@ -409,15 +477,17 @@ impl<S> Handler<S> for StaticFiles {
|
|||||||
Ok(FilesystemElement::Redirect(
|
Ok(FilesystemElement::Redirect(
|
||||||
HttpFound
|
HttpFound
|
||||||
.build()
|
.build()
|
||||||
.header::<_, &str>("LOCATION", &new_path)
|
.header(header::http::LOCATION, new_path.as_str())
|
||||||
.finish().unwrap()))
|
.finish().unwrap()))
|
||||||
} else if self.show_index {
|
} else if self.show_index {
|
||||||
Ok(FilesystemElement::Directory(Directory::new(self.directory.clone(), path)))
|
Ok(FilesystemElement::Directory(
|
||||||
|
Directory::new(self.directory.clone(), path)))
|
||||||
} else {
|
} else {
|
||||||
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
|
Err(io::Error::new(io::ErrorKind::NotFound, "not found"))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Ok(FilesystemElement::File(NamedFile::open(path)?))
|
Ok(FilesystemElement::File(
|
||||||
|
NamedFile::open(path)?.set_cpu_pool(self.cpu_pool.clone())))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,8 +26,12 @@ pub trait HttpMessage {
|
|||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
/// Get a header
|
/// Get a header
|
||||||
fn get_header<H: Header>(&self) -> Result<H, ParseError> where Self: Sized {
|
fn get_header<H: Header>(&self) -> Option<H> where Self: Sized {
|
||||||
H::parse(self)
|
if self.headers().contains_key(H::name()) {
|
||||||
|
H::parse(self).ok()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read the request content type. If request does not contain
|
/// Read the request content type. If request does not contain
|
||||||
|
@ -188,7 +188,7 @@ impl<S> HttpRequest<S> {
|
|||||||
/// Default `CpuPool`
|
/// Default `CpuPool`
|
||||||
#[inline]
|
#[inline]
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn cpu_pool(&mut self) -> &CpuPool {
|
pub fn cpu_pool(&self) -> &CpuPool {
|
||||||
self.router().expect("HttpRequest has to have Router instance")
|
self.router().expect("HttpRequest has to have Router instance")
|
||||||
.server_settings().cpu_pool()
|
.server_settings().cpu_pool()
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ use std::collections::VecDeque;
|
|||||||
|
|
||||||
use cookie::{Cookie, CookieJar};
|
use cookie::{Cookie, CookieJar};
|
||||||
use bytes::{Bytes, BytesMut, BufMut};
|
use bytes::{Bytes, BytesMut, BufMut};
|
||||||
|
use futures::Stream;
|
||||||
use http::{StatusCode, Version, HeaderMap, HttpTryFrom, Error as HttpError};
|
use http::{StatusCode, Version, HeaderMap, HttpTryFrom, Error as HttpError};
|
||||||
use http::header::{self, HeaderName, HeaderValue};
|
use http::header::{self, HeaderName, HeaderValue};
|
||||||
use serde_json;
|
use serde_json;
|
||||||
@ -480,6 +481,15 @@ impl HttpResponseBuilder {
|
|||||||
Ok(HttpResponse(Some(response)))
|
Ok(HttpResponse(Some(response)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set a streaming body and generate `HttpResponse`.
|
||||||
|
///
|
||||||
|
/// `HttpResponseBuilder` can not be used after this call.
|
||||||
|
pub fn streaming<S>(&mut self, stream: S) -> Result<HttpResponse, HttpError>
|
||||||
|
where S: Stream<Item=Bytes, Error=Error> + 'static,
|
||||||
|
{
|
||||||
|
self.body(Body::Streaming(Box::new(stream)))
|
||||||
|
}
|
||||||
|
|
||||||
/// Set a json body and generate `HttpResponse`
|
/// Set a json body and generate `HttpResponse`
|
||||||
///
|
///
|
||||||
/// `HttpResponseBuilder` can not be used after this call.
|
/// `HttpResponseBuilder` can not be used after this call.
|
||||||
|
Loading…
Reference in New Issue
Block a user