1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-24 01:38:20 +02:00

Compare commits

...

34 Commits

Author SHA1 Message Date
Nikolay Kim
6c709b33cc return error on write zero bytes 2018-03-10 10:42:46 -08:00
Nikolay Kim
71b4c07ea4 Fix json content type detection 2018-03-10 10:27:29 -08:00
Nikolay Kim
ac9eba8261 add api doc for Either 2018-03-10 10:12:44 -08:00
Nikolay Kim
cad55f9c80 add Either responder 2018-03-10 09:39:43 -08:00
Nikolay Kim
4263574a58 fix panic in cors if request does not contain origin header and send_wildcard is not set 2018-03-10 08:31:20 -08:00
messense
84ef5ee410 Merge pull request #116 from messense/feature/from-usize-to-keepalive
Impl From<usize> and From<Option<usize>> for KeepAlive
2018-03-10 22:55:55 +08:00
messense
598fb9190d rerun build if USE_SKEPTIC env var changed 2018-03-10 17:53:11 +08:00
messense
9a404a0c03 Impl From<usize> and From<Option<usize>> for KeepAlive 2018-03-10 17:52:50 +08:00
Nikolay Kim
3dd8fdf450 fix guide 2018-03-09 21:40:51 -08:00
Nikolay Kim
05f5ba0084 refactor keep-alive; fixed write to socket for upgraded connection 2018-03-09 16:21:14 -08:00
Nikolay Kim
8169149554 update wstool 2018-03-09 13:12:25 -08:00
Nikolay Kim
8d1de6c497 ws client timeouts 2018-03-09 13:12:14 -08:00
Nikolay Kim
caaace82e3 export symbols 2018-03-09 13:03:15 -08:00
Nikolay Kim
02dd5375a9 aling mask to 8 bytes 2018-03-09 10:25:47 -08:00
Nikolay Kim
717602472a clippy warnings 2018-03-09 10:11:38 -08:00
Nikolay Kim
b56be8e571 write buffer capacity for client 2018-03-09 10:09:13 -08:00
Nikolay Kim
2853086463 add write buffer capacity config 2018-03-09 10:00:15 -08:00
Nikolay Kim
e2107ec6f4 use small vec on hot path 2018-03-09 08:00:44 -08:00
Nikolay Kim
c33caddf57 update tests 2018-03-09 05:50:47 -08:00
Nikolay Kim
db1e04e418 prepare release 2018-03-09 05:42:42 -08:00
Nikolay Kim
f8b8fe3865 add space to cookie header 2018-03-09 05:38:07 -08:00
Nikolay Kim
1c6ddfd34c naming 2018-03-09 05:36:40 -08:00
Nikolay Kim
49e007ff2a move protobuf support to the example 2018-03-09 05:29:06 -08:00
Nikolay Kim
2068eee669 update readme 2018-03-08 20:58:05 -08:00
Nikolay Kim
f3c63e631a add protobuf feature 2018-03-08 20:56:18 -08:00
Nikolay Kim
3f0803a7d3 Merge branch 'master' of github.com:actix/actix-web 2018-03-08 20:39:10 -08:00
Nikolay Kim
f12b613211 more ws optimizations 2018-03-08 20:39:05 -08:00
Nikolay Kim
695c052c58 Merge pull request #115 from kingxsp/master
Add protobuf support
2018-03-08 18:59:18 -08:00
kingxsp
63634be542 Merge branch 'master' into master 2018-03-09 10:22:15 +08:00
Nikolay Kim
f88f1c65b6 update tests 2018-03-08 18:19:46 -08:00
kingxsp
a0b589eb96 Add protobuf support 2018-03-09 10:05:13 +08:00
Nikolay Kim
ebdc983dfe optimize websocket stream 2018-03-08 17:19:50 -08:00
Nikolay Kim
395243a539 another attempt to fix cookie handling 2018-03-08 11:16:54 -08:00
Nikolay Kim
1ab676d7eb bump version and add some tests 2018-03-07 22:40:46 -08:00
41 changed files with 1285 additions and 398 deletions

View File

@@ -59,6 +59,7 @@ script:
cd examples/multipart && cargo check && cd ../..
cd examples/json && cargo check && cd ../..
cd examples/juniper && cargo check && cd ../..
cd examples/protobuf && cargo check && cd ../..
cd examples/state && cargo check && cd ../..
cd examples/template_tera && cargo check && cd ../..
cd examples/diesel && cargo check && cd ../..
@@ -77,7 +78,7 @@ script:
after_success:
- |
if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "nightly" ]]; then
cargo doc --features "alpn, tls" --no-deps &&
cargo doc --features "alpn, tls, session" --no-deps &&
echo "<meta http-equiv=refresh content=0;url=os_balloon/index.html>" > target/doc/index.html &&
cargo install mdbook &&
cd guide && mdbook build -d ../target/doc/guide && cd .. &&

View File

@@ -1,5 +1,16 @@
# Changes
## 0.4.6 (2018-03-10)
* Fix client cookie handling
* Fix json content type detection
* Fix CORS middleware #117
* Optimize websockets stream support
## 0.4.5 (2018-03-07)
* Fix compression #103 and #104
@@ -12,8 +23,7 @@
* Better support for `NamedFile` type
* Add `ResponseError` impl for `SendRequestError`.
This improves ergonomics of http client.
* Add `ResponseError` impl for `SendRequestError`. This improves ergonomics of the client.
* Add native-tls support for client

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.4.5"
version = "0.4.6"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
readme = "README.md"
@@ -109,6 +109,7 @@ members = [
"examples/diesel",
"examples/r2d2",
"examples/json",
"examples/protobuf",
"examples/hello-world",
"examples/http-proxy",
"examples/multipart",

View File

@@ -10,6 +10,7 @@ Actix web is a simple, pragmatic, extremely fast, web framework for Rust.
* Configurable [request routing](https://actix.github.io/actix-web/guide/qs_5.html)
* Graceful server shutdown
* Multipart streams
* Static assets
* SSL support with openssl or native-tls
* Middlewares ([Logger](https://actix.github.io/actix-web/guide/qs_10.html#logging),
[Session](https://actix.github.io/actix-web/guide/qs_10.html#user-sessions),
@@ -51,13 +52,13 @@ fn main() {
* [Basics](https://github.com/actix/actix-web/tree/master/examples/basics/)
* [Stateful](https://github.com/actix/actix-web/tree/master/examples/state/)
* [Protobuf support](https://github.com/actix/actix-web/tree/master/examples/protobuf/)
* [Multipart streams](https://github.com/actix/actix-web/tree/master/examples/multipart/)
* [Simple websocket session](https://github.com/actix/actix-web/tree/master/examples/websocket/)
* [Tera templates](https://github.com/actix/actix-web/tree/master/examples/template_tera/)
* [Diesel integration](https://github.com/actix/actix-web/tree/master/examples/diesel/)
* [SSL / HTTP/2.0](https://github.com/actix/actix-web/tree/master/examples/tls/)
* [Tcp/Websocket chat](https://github.com/actix/actix-web/tree/master/examples/websocket-chat/)
* [SockJS Server](https://github.com/actix/actix-sockjs)
* [Json](https://github.com/actix/actix-web/tree/master/examples/json/)
You may consider checking out

View File

@@ -6,6 +6,7 @@ use std::{env, fs};
#[cfg(unix)]
fn main() {
println!("cargo:rerun-if-env-changed=USE_SKEPTIC");
let f = env::var("OUT_DIR").unwrap() + "/skeptic-tests.rs";
if env::var("USE_SKEPTIC").is_ok() {
let _ = fs::remove_file(f);

View File

@@ -0,0 +1,16 @@
[package]
name = "protobuf-example"
version = "0.1.0"
authors = ["kingxsp <jin_hb_zh@126.com>"]
[dependencies]
bytes = "0.4"
futures = "0.1"
failure = "0.1"
env_logger = "*"
prost = "0.2.0"
prost-derive = "0.2.0"
actix = "0.5"
actix-web = { path="../../" }

View File

@@ -0,0 +1,66 @@
# just start server and run client.py
# wget https://github.com/google/protobuf/releases/download/v3.5.1/protobuf-python-3.5.1.zip
# unzip protobuf-python-3.5.1.zip.1
# cd protobuf-3.5.1/python/
# python3.6 setup.py install
# pip3.6 install --upgrade pip
# pip3.6 install aiohttp
#!/usr/bin/env python
import test_pb2
import traceback
import sys
import asyncio
import aiohttp
def op():
try:
obj = test_pb2.MyObj()
obj.number = 9
obj.name = 'USB'
#Serialize
sendDataStr = obj.SerializeToString()
#print serialized string value
print('serialized string:', sendDataStr)
#------------------------#
# message transmission #
#------------------------#
receiveDataStr = sendDataStr
receiveData = test_pb2.MyObj()
#Deserialize
receiveData.ParseFromString(receiveDataStr)
print('pares serialize string, return: devId = ', receiveData.number, ', name = ', receiveData.name)
except(Exception, e):
print(Exception, ':', e)
print(traceback.print_exc())
errInfo = sys.exc_info()
print(errInfo[0], ':', errInfo[1])
async def fetch(session):
obj = test_pb2.MyObj()
obj.number = 9
obj.name = 'USB'
async with session.post('http://localhost:8080/', data=obj.SerializeToString(),
headers={"content-type": "application/protobuf"}) as resp:
print(resp.status)
data = await resp.read()
receiveObj = test_pb2.MyObj()
receiveObj.ParseFromString(data)
print(receiveObj)
async def go(loop):
obj = test_pb2.MyObj()
obj.number = 9
obj.name = 'USB'
async with aiohttp.ClientSession(loop=loop) as session:
await fetch(session)
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()

View File

@@ -0,0 +1,55 @@
extern crate actix;
extern crate actix_web;
extern crate bytes;
extern crate futures;
#[macro_use]
extern crate failure;
extern crate env_logger;
extern crate prost;
#[macro_use]
extern crate prost_derive;
use actix_web::*;
use futures::Future;
mod protobuf;
use protobuf::ProtoBufResponseBuilder;
#[derive(Clone, Debug, PartialEq, Message)]
pub struct MyObj {
#[prost(int32, tag="1")]
pub number: i32,
#[prost(string, tag="2")]
pub name: String,
}
/// This handler uses `ProtoBufMessage` for loading protobuf object.
fn index(req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
protobuf::ProtoBufMessage::new(req)
.from_err() // convert all errors into `Error`
.and_then(|val: MyObj| {
println!("model: {:?}", val);
Ok(httpcodes::HTTPOk.build().protobuf(val)?) // <- send response
})
.responder()
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::init();
let sys = actix::System::new("protobuf-example");
let addr = HttpServer::new(|| {
Application::new()
.middleware(middleware::Logger::default())
.resource("/", |r| r.method(Method::POST).f(index))})
.bind("127.0.0.1:8080").unwrap()
.shutdown_timeout(1)
.start();
println!("Started http server: 127.0.0.1:8080");
let _ = sys.run();
}

View File

@@ -0,0 +1,169 @@
use bytes::{Bytes, BytesMut};
use futures::{Poll, Future, Stream};
use bytes::IntoBuf;
use prost::Message;
use prost::DecodeError as ProtoBufDecodeError;
use prost::EncodeError as ProtoBufEncodeError;
use actix_web::header::http::{CONTENT_TYPE, CONTENT_LENGTH};
use actix_web::{Responder, HttpMessage, HttpRequest, HttpResponse};
use actix_web::dev::HttpResponseBuilder;
use actix_web::error::{Error, PayloadError, ResponseError};
use actix_web::httpcodes::{HttpBadRequest, HttpPayloadTooLarge};
#[derive(Fail, Debug)]
pub enum ProtoBufPayloadError {
/// Payload size is bigger than 256k
#[fail(display="Payload size is bigger than 256k")]
Overflow,
/// Content type error
#[fail(display="Content type error")]
ContentType,
/// Serialize error
#[fail(display="ProtoBud serialize error: {}", _0)]
Serialize(#[cause] ProtoBufEncodeError),
/// Deserialize error
#[fail(display="ProtoBud deserialize error: {}", _0)]
Deserialize(#[cause] ProtoBufDecodeError),
/// Payload error
#[fail(display="Error that occur during reading payload: {}", _0)]
Payload(#[cause] PayloadError),
}
impl ResponseError for ProtoBufPayloadError {
fn error_response(&self) -> HttpResponse {
match *self {
ProtoBufPayloadError::Overflow => HttpPayloadTooLarge.into(),
_ => HttpBadRequest.into(),
}
}
}
impl From<PayloadError> for ProtoBufPayloadError {
fn from(err: PayloadError) -> ProtoBufPayloadError {
ProtoBufPayloadError::Payload(err)
}
}
impl From<ProtoBufDecodeError> for ProtoBufPayloadError {
fn from(err: ProtoBufDecodeError) -> ProtoBufPayloadError {
ProtoBufPayloadError::Deserialize(err)
}
}
#[derive(Debug)]
pub struct ProtoBuf<T: Message>(pub T);
impl<T: Message> Responder for ProtoBuf<T> {
type Item = HttpResponse;
type Error = Error;
fn respond_to(self, _: HttpRequest) -> Result<HttpResponse, Error> {
let mut buf = Vec::new();
self.0.encode(&mut buf)
.map_err(|e| Error::from(ProtoBufPayloadError::Serialize(e)))
.and_then(|()| {
Ok(HttpResponse::Ok()
.content_type("application/protobuf")
.body(buf)
.into())
})
}
}
pub struct ProtoBufMessage<T, U: Message + Default>{
limit: usize,
ct: &'static str,
req: Option<T>,
fut: Option<Box<Future<Item=U, Error=ProtoBufPayloadError>>>,
}
impl<T, U: Message + Default> ProtoBufMessage<T, U> {
/// Create `ProtoBufMessage` for request.
pub fn new(req: T) -> Self {
ProtoBufMessage{
limit: 262_144,
req: Some(req),
fut: None,
ct: "application/protobuf",
}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
/// Set allowed content type.
///
/// By default *application/protobuf* content type is used. Set content type
/// to empty string if you want to disable content type check.
pub fn content_type(mut self, ct: &'static str) -> Self {
self.ct = ct;
self
}
}
impl<T, U: Message + Default + 'static> Future for ProtoBufMessage<T, U>
where T: HttpMessage + Stream<Item=Bytes, Error=PayloadError> + 'static
{
type Item = U;
type Error = ProtoBufPayloadError;
fn poll(&mut self) -> Poll<U, ProtoBufPayloadError> {
if let Some(req) = self.req.take() {
if let Some(len) = req.headers().get(CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<usize>() {
if len > self.limit {
return Err(ProtoBufPayloadError::Overflow);
}
} else {
return Err(ProtoBufPayloadError::Overflow);
}
}
}
// check content-type
if !self.ct.is_empty() && req.content_type() != self.ct {
return Err(ProtoBufPayloadError::ContentType)
}
let limit = self.limit;
let fut = req.from_err()
.fold(BytesMut::new(), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit {
Err(ProtoBufPayloadError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.and_then(|body| Ok(<U>::decode(&mut body.into_buf())?));
self.fut = Some(Box::new(fut));
}
self.fut.as_mut().expect("ProtoBufBody could not be used second time").poll()
}
}
pub trait ProtoBufResponseBuilder {
fn protobuf<T: Message>(&mut self, value: T) -> Result<HttpResponse, Error>;
}
impl ProtoBufResponseBuilder for HttpResponseBuilder {
fn protobuf<T: Message>(&mut self, value: T) -> Result<HttpResponse, Error> {
self.header(CONTENT_TYPE, "application/protobuf");
let mut body = Vec::new();
value.encode(&mut body).map_err(|e| ProtoBufPayloadError::Serialize(e))?;
Ok(self.body(body)?)
}
}

View File

@@ -0,0 +1,6 @@
syntax = "proto3";
message MyObj {
int32 number = 1;
string name = 2;
}

View File

@@ -0,0 +1,76 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: test.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='test.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n\ntest.proto\"%\n\x05MyObj\x12\x0e\n\x06number\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\tb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_MYOBJ = _descriptor.Descriptor(
name='MyObj',
full_name='MyObj',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='number', full_name='MyObj.number', index=0,
number=1, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='name', full_name='MyObj.name', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=14,
serialized_end=51,
)
DESCRIPTOR.message_types_by_name['MyObj'] = _MYOBJ
MyObj = _reflection.GeneratedProtocolMessageType('MyObj', (_message.Message,), dict(
DESCRIPTOR = _MYOBJ,
__module__ = 'test_pb2'
# @@protoc_insertion_point(class_scope:MyObj)
))
_sym_db.RegisterMessage(MyObj)
# @@protoc_insertion_point(module_scope)

View File

@@ -134,9 +134,10 @@ for full example.
Actix can wait for requests on a keep-alive connection. *Keep alive*
connection behavior is defined by server settings.
* `Some(75)` - enable 75 sec *keep alive* timer according request and response settings.
* `Some(0)` - disable *keep alive*.
* `None` - Use `SO_KEEPALIVE` socket option.
* `75` or `Some(75)` or `KeepAlive::Timeout(75)` - enable 75 sec *keep alive* timer according
request and response settings.
* `None` or `KeepAlive::Disabled` - disable *keep alive*.
* `KeepAlive::Tcp(75)` - Use `SO_KEEPALIVE` socket option.
```rust
# extern crate actix_web;
@@ -147,7 +148,17 @@ fn main() {
HttpServer::new(||
Application::new()
.resource("/", |r| r.h(httpcodes::HttpOk)))
.keep_alive(None); // <- Use `SO_KEEPALIVE` socket option.
.keep_alive(75); // <- Set keep-alive to 75 seconds
HttpServer::new(||
Application::new()
.resource("/", |r| r.h(httpcodes::HttpOk)))
.keep_alive(server::KeepAlive::Tcp(75)); // <- Use `SO_KEEPALIVE` socket option.
HttpServer::new(||
Application::new()
.resource("/", |r| r.h(httpcodes::HttpOk)))
.keep_alive(None); // <- Disable keep-alive
}
```

View File

@@ -235,6 +235,43 @@ fn main() {
Both methods could be combined. (i.e Async response with streaming body)
## Different return types (Either)
Sometimes you need to return different types of responses. For example
you can do error check and return error, otherwise return async response.
Or any result that requires two different types.
For this case [*Either*](../actix_web/enum.Either.html) type can be used.
```rust
# extern crate actix_web;
# extern crate futures;
# use actix_web::*;
# use futures::future::Future;
use futures::future::result;
use actix_web::{Either, Error, HttpResponse, httpcodes};
type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>;
fn index(req: HttpRequest) -> RegisterResult {
if true { // <- choose variant A
Either::A(
httpcodes::HttpBadRequest.with_body("Bad data"))
} else {
Either::B( // <- variant B
result(HttpResponse::Ok()
.content_type("text/html")
.body(format!("Hello!"))
.map_err(|e| e.into())).responder())
}
}
fn main() {
Application::new()
.resource("/register", |r| r.f(index))
.finish();
}
```
## Tokio core handle
Any actix web handler runs within properly configured

View File

@@ -77,7 +77,7 @@ pub enum ClientConnectorError {
#[fail(display = "{}", _0)]
Connector(#[cause] ConnectorError),
/// Connecting took too long
/// Connection took too long
#[fail(display = "Timeout out while establishing connection")]
Timeout,

View File

@@ -78,7 +78,6 @@ impl HttpResponseParser {
-> Poll<Option<Bytes>, PayloadError>
where T: IoStream
{
println!("PARSE payload, {:?}", self.decoder.is_some());
if self.decoder.is_some() {
loop {
// read payload

View File

@@ -1,4 +1,5 @@
use std::{fmt, mem};
use std::fmt::Write as FmtWrite;
use std::io::Write;
use actix::{Addr, Unsync};
@@ -8,6 +9,7 @@ use http::{uri, HeaderMap, Method, Version, Uri, HttpTryFrom, Error as HttpError
use http::header::{self, HeaderName, HeaderValue};
use serde_json;
use serde::Serialize;
use percent_encoding::{USERINFO_ENCODE_SET, percent_encode};
use body::Body;
use error::Error;
@@ -26,7 +28,7 @@ pub struct ClientRequest {
upgrade: bool,
encoding: ContentEncoding,
response_decompress: bool,
buffer_capacity: Option<(usize, usize)>,
buffer_capacity: usize,
conn: ConnectionType,
}
@@ -49,7 +51,7 @@ impl Default for ClientRequest {
upgrade: false,
encoding: ContentEncoding::Auto,
response_decompress: true,
buffer_capacity: None,
buffer_capacity: 32_768,
conn: ConnectionType::Default,
}
}
@@ -177,7 +179,8 @@ impl ClientRequest {
self.response_decompress
}
pub fn buffer_capacity(&self) -> Option<(usize, usize)> {
/// Requested write buffer capacity
pub fn write_buffer_capacity(&self) -> usize {
self.buffer_capacity
}
@@ -464,12 +467,11 @@ impl ClientRequestBuilder {
}
/// Set write buffer capacity
pub fn buffer_capacity(&mut self,
low_watermark: usize,
high_watermark: usize) -> &mut Self
{
///
/// Default buffer capacity is 32kb
pub fn write_buffer_capacity(&mut self, cap: usize) -> &mut Self {
if let Some(parts) = parts(&mut self.request, &self.err) {
parts.buffer_capacity = Some((low_watermark, high_watermark));
parts.buffer_capacity = cap;
}
self
}
@@ -539,10 +541,14 @@ impl ClientRequestBuilder {
// set cookies
if let Some(ref mut jar) = self.cookies {
for cookie in jar.delta() {
request.headers.append(
header::COOKIE, HeaderValue::from_str(&cookie.to_string()).unwrap());
let mut cookie = String::new();
for c in jar.delta() {
let name = percent_encode(c.name().as_bytes(), USERINFO_ENCODE_SET);
let value = percent_encode(c.value().as_bytes(), USERINFO_ENCODE_SET);
let _ = write!(&mut cookie, "; {}={}", name, value);
}
request.headers.insert(
header::COOKIE, HeaderValue::from_str(&cookie.as_str()[2..]).unwrap());
}
request.body = body.into();
Ok(request)

View File

@@ -129,3 +129,20 @@ impl Stream for ClientResponse {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_debug() {
let resp = ClientResponse::new(ClientMessage::default());
resp.as_mut().headers.insert(
header::COOKIE, HeaderValue::from_static("cookie1=value1"));
resp.as_mut().headers.insert(
header::COOKIE, HeaderValue::from_static("cookie2=value2"));
let dbg = format!("{:?}", resp);
assert!(dbg.contains("ClientResponse"));
}
}

View File

@@ -24,8 +24,6 @@ use server::encoding::{ContentEncoder, TransferEncoding};
use client::ClientRequest;
const LOW_WATERMARK: usize = 1024;
const HIGH_WATERMARK: usize = 8 * LOW_WATERMARK;
const AVERAGE_HEADER_SIZE: usize = 30;
bitflags! {
@@ -42,9 +40,8 @@ pub(crate) struct HttpClientWriter {
written: u64,
headers_size: u32,
buffer: SharedBytes,
buffer_capacity: usize,
encoder: ContentEncoder,
low: usize,
high: usize,
}
impl HttpClientWriter {
@@ -55,10 +52,9 @@ impl HttpClientWriter {
flags: Flags::empty(),
written: 0,
headers_size: 0,
buffer_capacity: 0,
buffer,
encoder,
low: LOW_WATERMARK,
high: HIGH_WATERMARK,
}
}
@@ -70,12 +66,6 @@ impl HttpClientWriter {
// self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
// }
/// Set write buffer capacity
pub fn set_buffer_capacity(&mut self, low_watermark: usize, high_watermark: usize) {
self.low = low_watermark;
self.high = high_watermark;
}
fn write_to_stream<T: AsyncWrite>(&mut self, stream: &mut T) -> io::Result<WriterState> {
while !self.buffer.is_empty() {
match stream.write(self.buffer.as_ref()) {
@@ -87,7 +77,7 @@ impl HttpClientWriter {
let _ = self.buffer.split_to(n);
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.buffer.len() > self.high {
if self.buffer.len() > self.buffer_capacity {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@@ -106,9 +96,6 @@ impl HttpClientWriter {
// prepare task
self.flags.insert(Flags::STARTED);
self.encoder = content_encoder(self.buffer.clone(), msg);
if let Some(capacity) = msg.buffer_capacity() {
self.set_buffer_capacity(capacity.0, capacity.1);
}
// render message
{
@@ -153,6 +140,8 @@ impl HttpClientWriter {
self.written += bytes.len() as u64;
self.encoder.write(bytes)?;
}
} else {
self.buffer_capacity = msg.write_buffer_capacity();
}
}
Ok(())
@@ -168,7 +157,7 @@ impl HttpClientWriter {
}
}
if self.buffer.len() > self.high {
if self.buffer.len() > self.buffer_capacity {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)

View File

@@ -34,6 +34,61 @@ pub trait Responder {
fn respond_to(self, req: HttpRequest) -> Result<Self::Item, Self::Error>;
}
/// Combines two different responders types into a single type
///
/// ```rust
/// # extern crate actix_web;
/// # extern crate futures;
/// # use futures::future::Future;
/// use actix_web::AsyncResponder;
/// use futures::future::result;
/// use actix_web::{Either, Error, HttpRequest, HttpResponse, httpcodes};
///
/// type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>;
///
/// fn index(req: HttpRequest) -> RegisterResult {
/// if true { // <- choose variant A
/// Either::A(
/// httpcodes::HttpBadRequest.with_body("Bad data"))
/// } else {
/// Either::B( // <- variant B
/// result(HttpResponse::Ok()
/// .content_type("text/html")
/// .body(format!("Hello!"))
/// .map_err(|e| e.into())).responder())
/// }
/// }
/// # fn main() {}
/// ```
#[derive(Debug)]
pub enum Either<A, B> {
/// First branch of the type
A(A),
/// Second branch of the type
B(B),
}
impl<A, B> Responder for Either<A, B>
where A: Responder, B: Responder
{
type Item = Reply;
type Error = Error;
fn respond_to(self, req: HttpRequest) -> Result<Reply, Error> {
match self {
Either::A(a) => match a.respond_to(req) {
Ok(val) => Ok(val.into()),
Err(err) => Err(err.into()),
},
Either::B(b) => match b.respond_to(req) {
Ok(val) => Ok(val.into()),
Err(err) => Err(err.into()),
},
}
}
}
#[doc(hidden)]
/// Convenience trait that convert `Future` object into `Boxed` future
pub trait AsyncResponder<I, E>: Sized {

View File

@@ -97,7 +97,8 @@ impl HttpRequest<()> {
/// Construct a new Request.
#[inline]
pub fn new(method: Method, uri: Uri,
version: Version, headers: HeaderMap, payload: Option<Payload>) -> HttpRequest
version: Version, headers: HeaderMap, payload: Option<Payload>)
-> HttpRequest
{
HttpRequest(
SharedHttpInnerMessage::from_message(HttpInnerMessage {
@@ -120,7 +121,7 @@ impl HttpRequest<()> {
}
#[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[cfg_attr(feature="cargo-clippy", allow(inline_always))]
pub(crate) fn from_message(msg: SharedHttpInnerMessage) -> HttpRequest {
HttpRequest(msg, None, None)
}
@@ -335,7 +336,11 @@ impl<S> HttpRequest<S> {
let mut cookies = Vec::new();
for hdr in msg.headers.get_all(header::COOKIE) {
let s = str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?;
cookies.push(Cookie::parse_encoded(s)?.into_owned());
for cookie_str in s.split(';').map(|s| s.trim()) {
if !cookie_str.is_empty() {
cookies.push(Cookie::parse_encoded(cookie_str)?.into_owned());
}
}
}
msg.cookies = Some(cookies)
}

View File

@@ -18,6 +18,10 @@ use handler::Responder;
use header::{Header, IntoHeaderValue, ContentEncoding};
use httprequest::HttpRequest;
/// max write buffer size 64k
pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;
/// Represents various types of connection
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum ConnectionType {
@@ -198,6 +202,16 @@ impl HttpResponse {
pub(crate) fn set_response_size(&mut self, size: u64) {
self.get_mut().response_size = size;
}
/// Set write buffer capacity
pub fn write_buffer_capacity(&self) -> usize {
self.get_ref().write_capacity
}
/// Set write buffer capacity
pub fn set_write_buffer_capacity(&mut self, cap: usize) {
self.get_mut().write_capacity = cap;
}
}
impl fmt::Debug for HttpResponse {
@@ -462,6 +476,20 @@ impl HttpResponseBuilder {
self
}
/// Set write buffer capacity
///
/// This parameter makes sense only for streaming response
/// or actor. If write buffer reaches specified capacity, stream or actor get
/// paused.
///
/// Default write buffer capacity is 64kb
pub fn write_buffer_capacity(&mut self, cap: usize) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.write_capacity = cap;
}
self
}
/// Set a body and generate `HttpResponse`.
///
/// `HttpResponseBuilder` can not be used after this call.
@@ -692,6 +720,7 @@ struct InnerHttpResponse {
chunked: Option<bool>,
encoding: Option<ContentEncoding>,
connection_type: Option<ConnectionType>,
write_capacity: usize,
response_size: u64,
error: Option<Error>,
}
@@ -710,6 +739,7 @@ impl InnerHttpResponse {
encoding: None,
connection_type: None,
response_size: 0,
write_capacity: MAX_WRITE_BUFFER_SIZE,
error: None,
}
}
@@ -763,6 +793,7 @@ impl Pool {
inner.connection_type = None;
inner.response_size = 0;
inner.error = None;
inner.write_capacity = MAX_WRITE_BUFFER_SIZE;
v.push_front(inner);
}
})
@@ -781,7 +812,10 @@ mod tests {
#[test]
fn test_debug() {
let resp = HttpResponse::Ok().finish().unwrap();
let resp = HttpResponse::Ok()
.header(COOKIE, HeaderValue::from_static("cookie1=value1; "))
.header(COOKIE, HeaderValue::from_static("cookie2=value2; "))
.finish().unwrap();
let dbg = format!("{:?}", resp);
assert!(dbg.contains("HttpResponse"));
}
@@ -789,8 +823,8 @@ mod tests {
#[test]
fn test_response_cookies() {
let mut headers = HeaderMap::new();
headers.insert(COOKIE,
HeaderValue::from_static("cookie1=value1; cookie2=value2"));
headers.insert(COOKIE, HeaderValue::from_static("cookie1=value1"));
headers.insert(COOKIE, HeaderValue::from_static("cookie2=value2"));
let req = HttpRequest::new(
Method::GET, Uri::from_str("/").unwrap(), Version::HTTP_11, headers, None);
@@ -813,7 +847,7 @@ mod tests {
let mut val: Vec<_> = resp.headers().get_all("Set-Cookie")
.iter().map(|v| v.to_str().unwrap().to_owned()).collect();
val.sort();
assert!(val[0].starts_with("cookie1=; Max-Age=0;"));
assert!(val[0].starts_with("cookie2=; Max-Age=0;"));
assert_eq!(
val[1],"name=value; HttpOnly; Path=/test; Domain=www.rust-lang.org; Max-Age=86400");
}

View File

@@ -2,6 +2,7 @@ use bytes::{Bytes, BytesMut};
use futures::{Poll, Future, Stream};
use http::header::CONTENT_LENGTH;
use mime;
use serde_json;
use serde::Serialize;
use serde::de::DeserializeOwned;
@@ -82,7 +83,6 @@ impl<T: Serialize> Responder for Json<T> {
/// ```
pub struct JsonBody<T, U: DeserializeOwned>{
limit: usize,
ct: &'static str,
req: Option<T>,
fut: Option<Box<Future<Item=U, Error=JsonPayloadError>>>,
}
@@ -95,7 +95,6 @@ impl<T, U: DeserializeOwned> JsonBody<T, U> {
limit: 262_144,
req: Some(req),
fut: None,
ct: "application/json",
}
}
@@ -104,15 +103,6 @@ impl<T, U: DeserializeOwned> JsonBody<T, U> {
self.limit = limit;
self
}
/// Set allowed content type.
///
/// By default *application/json* content type is used. Set content type
/// to empty string if you want to disable content type check.
pub fn content_type(mut self, ct: &'static str) -> Self {
self.ct = ct;
self
}
}
impl<T, U: DeserializeOwned + 'static> Future for JsonBody<T, U>
@@ -135,7 +125,13 @@ impl<T, U: DeserializeOwned + 'static> Future for JsonBody<T, U>
}
}
// check content-type
if !self.ct.is_empty() && req.content_type() != self.ct {
let json = if let Ok(Some(mime)) = req.mime_type() {
mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON)
} else {
false
};
if !json {
return Err(JsonPayloadError::ContentType)
}
@@ -200,8 +196,8 @@ mod tests {
let mut req = HttpRequest::default();
req.headers_mut().insert(header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"));
let mut json = req.json::<MyObject>().content_type("text/json");
header::HeaderValue::from_static("application/text"));
let mut json = req.json::<MyObject>();
assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType);
let mut req = HttpRequest::default();

View File

@@ -136,7 +136,7 @@ pub use application::Application;
pub use httpmessage::HttpMessage;
pub use httprequest::HttpRequest;
pub use httpresponse::HttpResponse;
pub use handler::{Reply, Responder, NormalizePath, AsyncResponder};
pub use handler::{Either, Reply, Responder, NormalizePath, AsyncResponder};
pub use route::Route;
pub use resource::Resource;
pub use context::HttpContext;

View File

@@ -349,8 +349,7 @@ impl<S> Middleware<S> for Cors {
if self.send_wildcard {
resp.headers_mut().insert(
header::ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
} else {
let origin = req.headers().get(header::ORIGIN).unwrap();
} else if let Some(origin) = req.headers().get(header::ORIGIN) {
resp.headers_mut().insert(
header::ACCESS_CONTROL_ALLOW_ORIGIN, origin.clone());
}
@@ -807,6 +806,25 @@ mod tests {
assert!(cors.start(&mut req).unwrap().is_done());
}
#[test]
fn test_no_origin_response() {
let cors = Cors::build().finish().unwrap();
let mut req = TestRequest::default().method(Method::GET).finish();
let resp: HttpResponse = HttpOk.into();
let resp = cors.response(&mut req, resp).unwrap().response();
assert!(resp.headers().get(header::ACCESS_CONTROL_ALLOW_ORIGIN).is_none());
let mut req = TestRequest::with_header(
"Origin", "https://www.example.com")
.method(Method::OPTIONS)
.finish();
let resp = cors.response(&mut req, resp).unwrap().response();
assert_eq!(
&b"https://www.example.com"[..],
resp.headers().get(header::ACCESS_CONTROL_ALLOW_ORIGIN).unwrap().as_bytes());
}
#[test]
fn test_response() {
let cors = Cors::build()

View File

@@ -130,7 +130,7 @@ impl<S> InnerMultipart<S> where S: Stream<Item=Bytes, Error=PayloadError> {
fn read_headers(payload: &mut PayloadHelper<S>) -> Poll<HeaderMap, MultipartError>
{
match payload.readuntil(b"\r\n\r\n")? {
match payload.read_until(b"\r\n\r\n")? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(bytes)) => {
@@ -469,23 +469,23 @@ impl<S> InnerField<S> where S: Stream<Item=Bytes, Error=PayloadError> {
fn read_stream(payload: &mut PayloadHelper<S>, boundary: &str)
-> Poll<Option<Bytes>, MultipartError>
{
match payload.readuntil(b"\r")? {
match payload.read_until(b"\r")? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(mut chunk)) => {
if chunk.len() == 1 {
payload.unread_data(chunk);
match payload.readexactly(boundary.len() + 4)? {
match payload.read_exact(boundary.len() + 4)? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(chunk)) => {
if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" &&
&chunk[4..] == boundary.as_bytes()
{
payload.unread_data(chunk.freeze());
payload.unread_data(chunk);
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(chunk.freeze())))
Ok(Async::Ready(Some(chunk)))
}
}
}

View File

@@ -117,18 +117,21 @@ pub struct PayloadSender {
impl PayloadWriter for PayloadSender {
#[inline]
fn set_error(&mut self, err: PayloadError) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().set_error(err)
}
}
#[inline]
fn feed_eof(&mut self) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_eof()
}
}
#[inline]
fn feed_data(&mut self, data: Bytes) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_data(data)
@@ -172,24 +175,29 @@ impl Inner {
}
}
#[inline]
fn set_error(&mut self, err: PayloadError) {
self.err = Some(err);
}
#[inline]
fn feed_eof(&mut self) {
self.eof = true;
}
#[inline]
fn feed_data(&mut self, data: Bytes) {
self.len += data.len();
self.need_read = false;
self.items.push_back(data);
}
#[inline]
fn eof(&self) -> bool {
self.items.is_empty() && self.eof
}
#[inline]
fn len(&self) -> usize {
self.len
}
@@ -247,6 +255,7 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
}
#[inline]
fn poll_stream(&mut self) -> Poll<bool, PayloadError> {
self.stream.poll().map(|res| {
match res {
@@ -261,6 +270,7 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
})
}
#[inline]
pub fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
@@ -274,25 +284,85 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
}
pub fn readexactly(&mut self, size: usize) -> Poll<Option<BytesMut>, PayloadError> {
#[inline]
pub fn can_read(&mut self, size: usize) -> Poll<Option<bool>, PayloadError> {
if size <= self.len {
let mut buf = BytesMut::with_capacity(size);
while buf.len() < size {
Ok(Async::Ready(Some(true)))
} else {
match self.poll_stream()? {
Async::Ready(true) => self.can_read(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
#[inline]
pub fn get_chunk(&mut self) -> Poll<Option<&[u8]>, PayloadError> {
if self.items.is_empty() {
match self.poll_stream()? {
Async::Ready(true) => (),
Async::Ready(false) => return Ok(Async::Ready(None)),
Async::NotReady => return Ok(Async::NotReady),
}
}
match self.items.front().map(|c| c.as_ref()) {
Some(chunk) => Ok(Async::Ready(Some(chunk))),
None => Ok(Async::NotReady),
}
}
#[inline]
pub fn read_exact(&mut self, size: usize) -> Poll<Option<Bytes>, PayloadError> {
if size <= self.len {
self.len -= size;
let mut chunk = self.items.pop_front().unwrap();
if size < chunk.len() {
let buf = chunk.split_to(size);
self.items.push_front(chunk);
Ok(Async::Ready(Some(buf)))
}
else if size == chunk.len() {
Ok(Async::Ready(Some(chunk)))
}
else {
let mut buf = BytesMut::with_capacity(size);
buf.extend_from_slice(&chunk);
while buf.len() < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
self.items.push_front(chunk);
}
}
Ok(Async::Ready(Some(buf.freeze())))
}
} else {
match self.poll_stream()? {
Async::Ready(true) => self.read_exact(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
#[inline]
pub fn drop_payload(&mut self, size: usize) {
if size <= self.len {
self.len -= size;
let mut len = 0;
while len < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
self.len -= rem;
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
let rem = cmp::min(size-len, chunk.len());
len += rem;
if rem < chunk.len() {
chunk.split_to(rem);
self.items.push_front(chunk);
}
}
return Ok(Async::Ready(Some(buf)))
}
match self.poll_stream()? {
Async::Ready(true) => self.readexactly(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
@@ -317,7 +387,7 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
}
pub fn readuntil(&mut self, line: &[u8]) -> Poll<Option<Bytes>, PayloadError> {
pub fn read_until(&mut self, line: &[u8]) -> Poll<Option<Bytes>, PayloadError> {
let mut idx = 0;
let mut num = 0;
let mut offset = 0;
@@ -366,14 +436,14 @@ impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
match self.poll_stream()? {
Async::Ready(true) => self.readuntil(line),
Async::Ready(true) => self.read_until(line),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
pub fn readline(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.readuntil(b"\n")
self.read_until(b"\n")
}
pub fn unread_data(&mut self, data: Bytes) {
@@ -486,21 +556,21 @@ mod tests {
let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
assert_eq!(Async::NotReady, payload.readexactly(2).ok().unwrap());
assert_eq!(Async::NotReady, payload.read_exact(2).ok().unwrap());
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
assert_eq!(Async::Ready(Some(BytesMut::from("li"))),
payload.readexactly(2).ok().unwrap());
assert_eq!(Async::Ready(Some(Bytes::from_static(b"li"))),
payload.read_exact(2).ok().unwrap());
assert_eq!(payload.len, 3);
assert_eq!(Async::Ready(Some(BytesMut::from("ne1l"))),
payload.readexactly(4).ok().unwrap());
assert_eq!(Async::Ready(Some(Bytes::from_static(b"ne1l"))),
payload.read_exact(4).ok().unwrap());
assert_eq!(payload.len, 4);
sender.set_error(PayloadError::Incomplete);
payload.readexactly(10).err().unwrap();
payload.read_exact(10).err().unwrap();
let res: Result<(), ()> = Ok(());
result(res)
@@ -513,21 +583,21 @@ mod tests {
let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
assert_eq!(Async::NotReady, payload.readuntil(b"ne").ok().unwrap());
assert_eq!(Async::NotReady, payload.read_until(b"ne").ok().unwrap());
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
assert_eq!(Async::Ready(Some(Bytes::from("line"))),
payload.readuntil(b"ne").ok().unwrap());
payload.read_until(b"ne").ok().unwrap());
assert_eq!(payload.len, 1);
assert_eq!(Async::Ready(Some(Bytes::from("1line2"))),
payload.readuntil(b"2").ok().unwrap());
payload.read_until(b"2").ok().unwrap());
assert_eq!(payload.len, 0);
sender.set_error(PayloadError::Incomplete);
payload.readuntil(b"b").err().unwrap();
payload.read_until(b"b").err().unwrap();
let res: Result<(), ()> = Ok(());
result(res)

View File

@@ -1,6 +1,7 @@
use std::rc::Rc;
use std::marker::PhantomData;
use smallvec::SmallVec;
use http::{Method, StatusCode};
use pred;
@@ -34,7 +35,7 @@ use httpresponse::HttpResponse;
pub struct Resource<S=()> {
name: String,
state: PhantomData<S>,
routes: Vec<Route<S>>,
routes: SmallVec<[Route<S>; 3]>,
middlewares: Rc<Vec<Box<Middleware<S>>>>,
}
@@ -43,7 +44,7 @@ impl<S> Default for Resource<S> {
Resource {
name: String::new(),
state: PhantomData,
routes: Vec::new(),
routes: SmallVec::new(),
middlewares: Rc::new(Vec::new()) }
}
}
@@ -54,7 +55,7 @@ impl<S> Resource<S> {
Resource {
name: String::new(),
state: PhantomData,
routes: Vec::new(),
routes: SmallVec::new(),
middlewares: Rc::new(Vec::new()) }
}

View File

@@ -192,134 +192,112 @@ impl<T, H> Http1<T, H>
let retry = self.reader.need_read() == PayloadStatus::Read;
loop {
// check in-flight messages
let mut io = false;
let mut idx = 0;
while idx < self.tasks.len() {
let item = &mut self.tasks[idx];
// check in-flight messages
let mut io = false;
let mut idx = 0;
while idx < self.tasks.len() {
let item = &mut self.tasks[idx];
if !io && !item.flags.contains(EntryFlags::EOF) {
// io is corrupted, send buffer
if item.flags.contains(EntryFlags::ERROR) {
if !io && !item.flags.contains(EntryFlags::EOF) {
// io is corrupted, send buffer
if item.flags.contains(EntryFlags::ERROR) {
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
match item.pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => {
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
}
// prepare stream for next response
self.stream.reset();
if ready {
item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED);
} else {
item.flags.insert(EntryFlags::FINISHED);
}
},
// no more IO for this iteration
Ok(Async::NotReady) => {
if self.reader.need_read() == PayloadStatus::Read && !retry {
return Ok(Async::Ready(true));
}
io = true;
}
Err(err) => {
// it is not possible to recover from error
// during pipe handling, so just drop connection
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::ERROR);
// check stream state, we still can have valid data in buffer
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
match item.pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => {
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
}
// prepare stream for next response
self.stream.reset();
if ready {
item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED);
} else {
item.flags.insert(EntryFlags::FINISHED);
}
},
// no more IO for this iteration
Ok(Async::NotReady) => {
if self.reader.need_read() == PayloadStatus::Read && !retry {
return Ok(Async::Ready(true));
}
io = true;
}
Err(err) => {
// it is not possible to recover from error
// during pipe handling, so just drop connection
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::ERROR);
// check stream state, we still can have valid data in buffer
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady)
}
return Err(())
}
}
} else if !item.flags.contains(EntryFlags::FINISHED) {
match item.pipe.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED),
Err(err) => {
item.flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err);
}
}
} else if !item.flags.contains(EntryFlags::FINISHED) {
match item.pipe.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED),
Err(err) => {
item.flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err);
}
}
idx += 1;
}
idx += 1;
}
// cleanup finished tasks
let mut popped = false;
while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF | EntryFlags::FINISHED) {
popped = true;
self.tasks.pop_front();
} else {
break
}
}
if need_read && popped {
return self.poll_io()
// cleanup finished tasks
let mut popped = false;
while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF | EntryFlags::FINISHED) {
popped = true;
self.tasks.pop_front();
} else {
break
}
}
if need_read && popped {
return self.poll_io()
}
// no keep-alive
if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() {
// check stream state
if !self.poll_completed(true)? {
return Ok(Async::NotReady)
}
// check stream state
if !self.poll_completed(true)? {
return Ok(Async::NotReady)
}
// deal with keep-alive
if self.tasks.is_empty() {
// no keep-alive situations
if self.flags.contains(Flags::ERROR)
|| !self.flags.contains(Flags::KEEPALIVE)
|| !self.settings.keep_alive_enabled()
{
return Ok(Async::Ready(false))
}
// start keep-alive timer, this also is slow request timeout
if self.tasks.is_empty() {
// check stream state
if self.flags.contains(Flags::ERROR) {
return Ok(Async::Ready(false))
}
if self.settings.keep_alive_enabled() {
let keep_alive = self.settings.keep_alive();
if keep_alive > 0 && self.flags.contains(Flags::KEEPALIVE) {
if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut to = Timeout::new(
Duration::new(keep_alive, 0), Arbiter::handle()).unwrap();
// register timeout
let _ = to.poll();
self.keepalive_timer = Some(to);
}
} else {
// check stream state
if !self.poll_completed(true)? {
return Ok(Async::NotReady)
}
// keep-alive is disabled, drop connection
return Ok(Async::Ready(false))
}
} else if !self.poll_completed(false)? ||
self.flags.contains(Flags::KEEPALIVE) {
// check stream state or
// if keep-alive unset, rely on operating system
return Ok(Async::NotReady)
} else {
return Ok(Async::Ready(false))
}
} else {
self.poll_completed(false)?;
return Ok(Async::NotReady)
// start keep-alive timer
let keep_alive = self.settings.keep_alive();
if self.keepalive_timer.is_none() && keep_alive > 0 {
trace!("Start keep-alive timer");
let mut timer = Timeout::new(
Duration::new(keep_alive, 0), Arbiter::handle()).unwrap();
// register timer
let _ = timer.poll();
self.keepalive_timer = Some(timer);
}
}
Ok(Async::NotReady)
}
}
@@ -868,7 +846,7 @@ mod tests {
use httpmessage::HttpMessage;
use application::HttpApplication;
use server::settings::WorkerSettings;
use server::IoStream;
use server::{IoStream, KeepAlive};
struct Buffer {
buf: Bytes,
@@ -939,7 +917,8 @@ mod tests {
macro_rules! parse_ready {
($e:expr) => ({
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
match Reader::new().parse($e, &mut BytesMut::new(), &settings) {
Ok(Async::Ready(req)) => req,
Ok(_) => panic!("Eof during parsing http request"),
@@ -961,7 +940,8 @@ mod tests {
macro_rules! expect_parse_err {
($e:expr) => ({
let mut buf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
match Reader::new().parse($e, &mut buf, &settings) {
Err(err) => match err {
@@ -979,7 +959,8 @@ mod tests {
fn test_parse() {
let mut buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -996,7 +977,8 @@ mod tests {
fn test_parse_partial() {
let mut buf = Buffer::new("PUT /test HTTP/1");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1019,7 +1001,8 @@ mod tests {
fn test_parse_post() {
let mut buf = Buffer::new("POST /test2 HTTP/1.0\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1036,7 +1019,8 @@ mod tests {
fn test_parse_body() {
let mut buf = Buffer::new("GET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1055,7 +1039,8 @@ mod tests {
let mut buf = Buffer::new(
"\r\nGET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1073,7 +1058,8 @@ mod tests {
fn test_parse_partial_eof() {
let mut buf = Buffer::new("GET /test HTTP/1.1\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
not_ready!{ reader.parse(&mut buf, &mut readbuf, &settings) }
@@ -1093,7 +1079,8 @@ mod tests {
fn test_headers_split_field() {
let mut buf = Buffer::new("GET /test HTTP/1.1\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
not_ready!{ reader.parse(&mut buf, &mut readbuf, &settings) }
@@ -1123,7 +1110,8 @@ mod tests {
Set-Cookie: c1=cookie1\r\n\
Set-Cookie: c2=cookie2\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
match reader.parse(&mut buf, &mut readbuf, &settings) {
@@ -1358,7 +1346,8 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
@@ -1379,7 +1368,8 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
@@ -1408,7 +1398,8 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
@@ -1458,7 +1449,8 @@ mod tests {
"GET /test HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n");
let mut readbuf = BytesMut::new();
let settings = WorkerSettings::<HttpApplication>::new(Vec::new(), None);
let settings = WorkerSettings::<HttpApplication>::new(
Vec::new(), KeepAlive::Os);
let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));

View File

@@ -34,6 +34,7 @@ pub(crate) struct H1Writer<T: AsyncWrite> {
written: u64,
headers_size: u32,
buffer: SharedBytes,
buffer_capacity: usize,
}
impl<T: AsyncWrite> H1Writer<T> {
@@ -45,6 +46,7 @@ impl<T: AsyncWrite> H1Writer<T> {
written: 0,
headers_size: 0,
buffer: buf,
buffer_capacity: 0,
stream,
}
}
@@ -66,27 +68,22 @@ impl<T: AsyncWrite> H1Writer<T> {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
}
fn write_to_stream(&mut self) -> io::Result<WriterState> {
while !self.buffer.is_empty() {
match self.stream.write(self.buffer.as_ref()) {
fn write_data(&mut self, data: &[u8]) -> io::Result<usize> {
let mut written = 0;
while written < data.len() {
match self.stream.write(&data[written..]) {
Ok(0) => {
self.disconnected();
return Ok(WriterState::Done);
},
Ok(n) => {
let _ = self.buffer.split_to(n);
return Err(io::Error::new(io::ErrorKind::WriteZero, ""))
},
Ok(n) => written += n,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
}
return Ok(written)
}
Err(err) => return Err(err),
}
}
Ok(WriterState::Done)
Ok(written)
}
}
@@ -199,6 +196,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.written = bytes.len() as u64;
self.encoder.write(bytes)?;
} else {
// capacity, makes sense only for streaming or actor
self.buffer_capacity = msg.write_buffer_capacity();
msg.replace_body(body);
}
Ok(WriterState::Done)
@@ -211,18 +211,10 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// shortcut for upgraded connection
if self.flags.contains(Flags::UPGRADE) {
if self.buffer.is_empty() {
match self.stream.write(payload.as_ref()) {
Ok(0) => {
self.disconnected();
return Ok(WriterState::Done);
},
Ok(n) => if payload.len() < n {
self.buffer.extend_from_slice(&payload.as_ref()[n..])
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(WriterState::Done)
}
Err(err) => return Err(err),
let n = self.write_data(payload.as_ref())?;
if payload.len() < n {
self.buffer.extend_from_slice(&payload.as_ref()[n..]);
return Ok(WriterState::Done);
}
} else {
self.buffer.extend(payload);
@@ -259,16 +251,18 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
#[inline]
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
match self.write_to_stream() {
Ok(WriterState::Done) => {
if shutdown {
self.stream.shutdown()
} else {
Ok(Async::Ready(()))
}
},
Ok(WriterState::Pause) => Ok(Async::NotReady),
Err(err) => Err(err)
if !self.buffer.is_empty() {
let buf: &[u8] = unsafe{mem::transmute(self.buffer.as_ref())};
let written = self.write_data(buf)?;
let _ = self.buffer.split_to(written);
if self.buffer.len() > self.buffer_capacity {
return Ok(Async::NotReady)
}
}
if shutdown {
self.stream.shutdown()
} else {
Ok(Async::Ready(()))
}
}
}

View File

@@ -34,6 +34,7 @@ pub(crate) struct H2Writer {
flags: Flags,
written: u64,
buffer: SharedBytes,
buffer_capacity: usize,
}
impl H2Writer {
@@ -46,6 +47,7 @@ impl H2Writer {
flags: Flags::empty(),
written: 0,
buffer: buf,
buffer_capacity: 0,
}
}
@@ -71,7 +73,7 @@ impl H2Writer {
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > self.buffer_capacity {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@@ -111,8 +113,11 @@ impl Writer for H2Writer {
self.written
}
fn start(&mut self, req: &mut HttpInnerMessage, msg: &mut HttpResponse, encoding: ContentEncoding)
-> io::Result<WriterState> {
fn start(&mut self,
req: &mut HttpInnerMessage,
msg: &mut HttpResponse,
encoding: ContentEncoding) -> io::Result<WriterState>
{
// prepare response
self.flags.insert(Flags::STARTED);
self.encoder = ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding);
@@ -172,6 +177,7 @@ impl Writer for H2Writer {
Ok(WriterState::Pause)
} else {
msg.replace_body(body);
self.buffer_capacity = msg.write_buffer_capacity();
Ok(WriterState::Done)
}
}

View File

@@ -31,6 +31,35 @@ use httpresponse::HttpResponse;
/// max buffer size 64k
pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;
#[derive(Debug, PartialEq, Clone, Copy)]
/// Server keep-alive setting
pub enum KeepAlive {
/// Keep alive in seconds
Timeout(usize),
/// Use `SO_KEEPALIVE` socket option, value in seconds
Tcp(usize),
/// Relay on OS to shutdown tcp connection
Os,
/// Disabled
Disabled,
}
impl From<usize> for KeepAlive {
fn from(keepalive: usize) -> Self {
KeepAlive::Timeout(keepalive)
}
}
impl From<Option<usize>> for KeepAlive {
fn from(keepalive: Option<usize>) -> Self {
if let Some(keepalive) = keepalive {
KeepAlive::Timeout(keepalive)
} else {
KeepAlive::Disabled
}
}
}
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.

View File

@@ -5,6 +5,7 @@ use std::cell::{Cell, RefCell, RefMut, UnsafeCell};
use futures_cpupool::{Builder, CpuPool};
use helpers;
use super::KeepAlive;
use super::channel::Node;
use super::shared::{SharedBytes, SharedBytesPool};
@@ -97,8 +98,8 @@ impl ServerSettings {
pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
enabled: bool,
keep_alive: u64,
ka_enabled: bool,
bytes: Rc<SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>,
@@ -106,11 +107,16 @@ pub(crate) struct WorkerSettings<H> {
}
impl<H> WorkerSettings<H> {
pub(crate) fn new(h: Vec<H>, keep_alive: Option<u64>) -> WorkerSettings<H> {
pub(crate) fn new(h: Vec<H>, keep_alive: KeepAlive) -> WorkerSettings<H> {
let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true),
KeepAlive::Os | KeepAlive::Tcp(_) => (0, true),
KeepAlive::Disabled => (0, false),
};
WorkerSettings {
keep_alive, ka_enabled,
h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0),
@@ -135,7 +141,7 @@ impl<H> WorkerSettings<H> {
}
pub fn keep_alive_enabled(&self) -> bool {
self.enabled
self.ka_enabled
}
pub fn get_shared_bytes(&self) -> SharedBytes {

View File

@@ -27,7 +27,7 @@ impl SharedBytesPool {
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
Rc::get_mut(&mut bytes).unwrap().clear();
v.push_front(bytes);
}
}
@@ -62,7 +62,7 @@ impl SharedBytes {
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
pub(crate) fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}

View File

@@ -20,13 +20,12 @@ use native_tls::TlsAcceptor;
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
use helpers;
use super::{IntoHttpHandler, IoStream};
use super::{IntoHttpHandler, IoStream, KeepAlive};
use super::{PauseServer, ResumeServer, StopServer};
use super::channel::{HttpChannel, WrapperStream};
use super::worker::{Conn, Worker, StreamHandlerType, StopWorker};
use super::settings::{ServerSettings, WorkerSettings};
/// An HTTP Server
pub struct HttpServer<H> where H: IntoHttpHandler + 'static
{
@@ -34,7 +33,7 @@ pub struct HttpServer<H> where H: IntoHttpHandler + 'static
threads: usize,
backlog: i32,
host: Option<String>,
keep_alive: Option<u64>,
keep_alive: KeepAlive,
factory: Arc<Fn() -> Vec<H> + Send + Sync>,
#[cfg_attr(feature="cargo-clippy", allow(type_complexity))]
workers: Vec<(usize, Addr<Syn, Worker<H::Handler>>)>,
@@ -83,7 +82,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
threads: num_cpus::get(),
backlog: 2048,
host: None,
keep_alive: None,
keep_alive: KeepAlive::Os,
factory: Arc::new(f),
workers: Vec::new(),
sockets: HashMap::new(),
@@ -124,15 +123,9 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
/// Set server keep-alive setting.
///
/// By default keep alive is enabled.
///
/// - `Some(75)` - enable
///
/// - `Some(0)` - disable
///
/// - `None` - use `SO_KEEPALIVE` socket option
pub fn keep_alive(mut self, val: Option<u64>) -> Self {
self.keep_alive = val;
/// By default keep alive is set to a `Os`.
pub fn keep_alive<T: Into<KeepAlive>>(mut self, val: T) -> Self {
self.keep_alive = val.into();
self
}

View File

@@ -23,7 +23,7 @@ use actix::*;
use actix::msgs::StopArbiter;
use helpers;
use server::HttpHandler;
use server::{HttpHandler, KeepAlive};
use server::channel::HttpChannel;
use server::settings::WorkerSettings;
@@ -48,21 +48,30 @@ impl Message for StopWorker {
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
pub(crate) struct Worker<H> where H: HttpHandler + 'static {
pub(crate)
struct Worker<H> where H: HttpHandler + 'static {
settings: Rc<WorkerSettings<H>>,
hnd: Handle,
handler: StreamHandlerType,
tcp_ka: Option<time::Duration>,
}
impl<H: HttpHandler + 'static> Worker<H> {
pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>)
pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: KeepAlive)
-> Worker<H>
{
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
} else {
None
};
Worker {
settings: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(),
handler,
tcp_ka,
}
}
@@ -106,9 +115,7 @@ impl<H> Handler<Conn<net::TcpStream>> for Worker<H>
fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>)
{
if !self.settings.keep_alive_enabled() &&
msg.io.set_keepalive(Some(time::Duration::new(75, 0))).is_err()
{
if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() {
error!("Can not set socket keep-alive option");
}
self.handler.handle(Rc::clone(&self.settings), &self.hnd, msg);

View File

@@ -2,17 +2,18 @@
use std::{fmt, io, str};
use std::rc::Rc;
use std::cell::UnsafeCell;
use std::time::Duration;
use base64;
use rand;
use bytes::Bytes;
use cookie::Cookie;
use byteorder::{ByteOrder, NetworkEndian};
use http::{HttpTryFrom, StatusCode, Error as HttpError};
use http::header::{self, HeaderName, HeaderValue};
use sha1::Sha1;
use futures::{Async, Future, Poll, Stream};
use futures::unsync::mpsc::{unbounded, UnboundedSender};
use byteorder::{ByteOrder, NetworkEndian};
use actix::prelude::*;
@@ -192,6 +193,14 @@ impl Client {
self
}
/// Set write buffer capacity
///
/// Default buffer capacity is 32kb
pub fn write_buffer_capacity(mut self, cap: usize) -> Self {
self.request.write_buffer_capacity(cap);
self
}
/// Set request header
pub fn header<K, V>(mut self, key: K, value: V) -> Self
where HeaderName: HttpTryFrom<K>, V: IntoHeaderValue
@@ -291,9 +300,32 @@ impl ClientHandshake {
request: None,
tx: None,
error: Some(err),
max_size: 0
max_size: 0,
}
}
/// Set handshake timeout
///
/// Handshake timeout is a total time before handshake should be completed.
/// Default value is 5 seconds.
pub fn timeout(mut self, timeout: Duration) -> Self {
if let Some(request) = self.request.take() {
self.request = Some(request.timeout(timeout));
}
self
}
/// Set connection timeout
///
/// Connection timeout includes resolving hostname and actual connection to
/// the host.
/// Default value is 1 second.
pub fn conn_timeout(mut self, timeout: Duration) -> Self {
if let Some(request) = self.request.take() {
self.request = Some(request.conn_timeout(timeout));
}
self
}
}
impl Future for ClientHandshake {

View File

@@ -1,4 +1,4 @@
use std::{fmt, mem};
use std::{fmt, mem, ptr};
use std::iter::FromIterator;
use bytes::{Bytes, BytesMut, BufMut};
use byteorder::{ByteOrder, BigEndian, NetworkEndian};
@@ -15,11 +15,8 @@ use ws::mask::apply_mask;
/// A struct representing a `WebSocket` frame.
#[derive(Debug)]
pub(crate) struct Frame {
pub struct Frame {
finished: bool,
rsv1: bool,
rsv2: bool,
rsv3: bool,
opcode: OpCode,
payload: Binary,
}
@@ -51,9 +48,11 @@ impl Frame {
Frame::message(payload, OpCode::Close, true, genmask)
}
/// Parse the input stream into a frame.
pub fn parse<S>(pl: &mut PayloadHelper<S>, server: bool, max_size: usize)
-> Poll<Option<Frame>, ProtocolError>
#[cfg_attr(feature="cargo-clippy", allow(type_complexity))]
fn read_copy_md<S>(pl: &mut PayloadHelper<S>,
server: bool,
max_size: usize
) -> Poll<Option<(usize, bool, OpCode, usize, Option<u32>)>, ProtocolError>
where S: Stream<Item=Bytes, Error=PayloadError>
{
let mut idx = 2;
@@ -74,12 +73,14 @@ impl Frame {
return Err(ProtocolError::MaskedFrame)
}
let rsv1 = first & 0x40 != 0;
let rsv2 = first & 0x20 != 0;
let rsv3 = first & 0x10 != 0;
// Op code
let opcode = OpCode::from(first & 0x0F);
let len = second & 0x7F;
if let OpCode::Bad = opcode {
return Err(ProtocolError::InvalidOpcode(first & 0x0F))
}
let len = second & 0x7F;
let length = if len == 126 {
let buf = match pl.copy(4)? {
Async::Ready(Some(buf)) => buf,
@@ -114,28 +115,129 @@ impl Frame {
Async::NotReady => return Ok(Async::NotReady),
};
let mut mask_bytes = [0u8; 4];
mask_bytes.copy_from_slice(&buf[idx..idx+4]);
let mask: &[u8] = &buf[idx..idx+4];
let mask_u32: u32 = unsafe {ptr::read_unaligned(mask.as_ptr() as *const u32)};
idx += 4;
Some(mask_bytes)
Some(mask_u32)
} else {
None
};
let mut data = match pl.readexactly(idx + length)? {
Async::Ready(Some(buf)) => buf,
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => return Ok(Async::NotReady),
};
Ok(Async::Ready(Some((idx, finished, opcode, length, mask))))
}
// get body
data.split_to(idx);
fn read_chunk_md(chunk: &[u8], server: bool, max_size: usize)
-> Poll<(usize, bool, OpCode, usize, Option<u32>), ProtocolError>
{
let chunk_len = chunk.len();
let mut idx = 2;
if chunk_len < 2 {
return Ok(Async::NotReady)
}
let first = chunk[0];
let second = chunk[1];
let finished = first & 0x80 != 0;
// check masking
let masked = second & 0x80 != 0;
if !masked && server {
return Err(ProtocolError::UnmaskedFrame)
} else if masked && !server {
return Err(ProtocolError::MaskedFrame)
}
// Op code
let opcode = OpCode::from(first & 0x0F);
// Disallow bad opcode
if let OpCode::Bad = opcode {
return Err(ProtocolError::InvalidOpcode(first & 0x0F))
}
let len = second & 0x7F;
let length = if len == 126 {
if chunk_len < 4 {
return Ok(Async::NotReady)
}
let len = NetworkEndian::read_uint(&chunk[idx..], 2) as usize;
idx += 2;
len
} else if len == 127 {
if chunk_len < 10 {
return Ok(Async::NotReady)
}
let len = NetworkEndian::read_uint(&chunk[idx..], 8) as usize;
idx += 8;
len
} else {
len as usize
};
// check for max allowed size
if length > max_size {
return Err(ProtocolError::Overflow)
}
let mask = if server {
if chunk_len < idx + 4 {
return Ok(Async::NotReady)
}
let mask: &[u8] = &chunk[idx..idx+4];
let mask_u32: u32 = unsafe {ptr::read_unaligned(mask.as_ptr() as *const u32)};
idx += 4;
Some(mask_u32)
} else {
None
};
Ok(Async::Ready((idx, finished, opcode, length, mask)))
}
/// Parse the input stream into a frame.
pub fn parse<S>(pl: &mut PayloadHelper<S>, server: bool, max_size: usize)
-> Poll<Option<Frame>, ProtocolError>
where S: Stream<Item=Bytes, Error=PayloadError>
{
// try to parse ws frame md from one chunk
let result = match pl.get_chunk()? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::Ready(Some(chunk)) => Frame::read_chunk_md(chunk, server, max_size)?,
};
let (idx, finished, opcode, length, mask) = match result {
// we may need to join several chunks
Async::NotReady => match Frame::read_copy_md(pl, server, max_size)? {
Async::Ready(Some(item)) => item,
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(None)),
},
Async::Ready(item) => item,
};
match pl.can_read(idx + length)? {
Async::Ready(Some(true)) => (),
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::Ready(Some(false)) | Async::NotReady => return Ok(Async::NotReady),
}
// remove prefix
pl.drop_payload(idx);
// no need for body
if length == 0 {
return Ok(Async::Ready(Some(Frame {
finished, opcode, payload: Binary::from("") })));
}
let data = match pl.read_exact(length)? {
Async::Ready(Some(buf)) => buf,
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => panic!(),
};
// control frames must have length <= 125
match opcode {
OpCode::Ping | OpCode::Pong if length > 125 => {
@@ -149,12 +251,14 @@ impl Frame {
}
// unmask
if let Some(ref mask) = mask {
apply_mask(&mut data, mask);
if let Some(mask) = mask {
#[allow(mutable_transmutes)]
let p: &mut [u8] = unsafe{let ptr: &[u8] = &data; mem::transmute(ptr)};
apply_mask(p, mask);
}
Ok(Async::Ready(Some(Frame {
finished, rsv1, rsv2, rsv3, opcode, payload: data.into() })))
finished, opcode, payload: data.into() })))
}
/// Generate binary representation
@@ -199,13 +303,13 @@ impl Frame {
};
if genmask {
let mask: [u8; 4] = rand::random();
let mask = rand::random::<u32>();
unsafe {
{
let buf_mut = buf.bytes_mut();
buf_mut[..4].copy_from_slice(&mask);
*(buf_mut as *mut _ as *mut u32) = mask;
buf_mut[4..payload_len+4].copy_from_slice(payload.as_ref());
apply_mask(&mut buf_mut[4..], &mask);
apply_mask(&mut buf_mut[4..], mask);
}
buf.advance_mut(payload_len + 4);
}
@@ -221,9 +325,6 @@ impl Default for Frame {
fn default() -> Frame {
Frame {
finished: true,
rsv1: false,
rsv2: false,
rsv3: false,
opcode: OpCode::Close,
payload: Binary::from(&b""[..]),
}
@@ -236,15 +337,11 @@ impl fmt::Display for Frame {
"
<FRAME>
final: {}
reserved: {} {} {}
opcode: {}
payload length: {}
payload: 0x{}
</FRAME>",
self.finished,
self.rsv1,
self.rsv2,
self.rsv3,
self.opcode,
self.payload.len(),
self.payload.as_ref().iter().map(
@@ -282,7 +379,6 @@ mod tests {
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let frame = extract(Frame::parse(&mut buf, false, 1024));
println!("FRAME: {}", frame);
assert!(!frame.finished);
assert_eq!(frame.opcode, OpCode::Text);
assert_eq!(frame.payload.as_ref(), &b"1"[..]);

View File

@@ -5,7 +5,7 @@ use std::ptr::copy_nonoverlapping;
/// Mask/unmask a frame.
#[inline]
pub fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) {
pub fn apply_mask(buf: &mut [u8], mask: u32) {
apply_mask_fast32(buf, mask)
}
@@ -18,34 +18,41 @@ fn apply_mask_fallback(buf: &mut [u8], mask: &[u8; 4]) {
}
}
/// Faster version of `apply_mask()` which operates on 4-byte blocks.
/// Faster version of `apply_mask()` which operates on 8-byte blocks.
#[inline]
#[allow(dead_code)]
fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) {
// TODO replace this with read_unaligned() as it stabilizes.
let mask_u32 = unsafe {
let mut m: u32 = uninitialized();
#[allow(trivial_casts)]
copy_nonoverlapping(mask.as_ptr(), &mut m as *mut _ as *mut u8, 4);
m
};
#[cfg_attr(feature="cargo-clippy", allow(cast_lossless))]
fn apply_mask_fast32(buf: &mut [u8], mask_u32: u32) {
let mut ptr = buf.as_mut_ptr();
let mut len = buf.len();
// Possible first unaligned block.
let head = min(len, (4 - (ptr as usize & 3)) & 3);
let head = min(len, (8 - (ptr as usize & 0x7)) & 0x3);
let mask_u32 = if head > 0 {
unsafe {
xor_mem(ptr, mask_u32, head);
ptr = ptr.offset(head as isize);
}
len -= head;
if cfg!(target_endian = "big") {
mask_u32.rotate_left(8 * head as u32)
let n = if head > 4 { head - 4 } else { head };
let mask_u32 = if n > 0 {
unsafe {
xor_mem(ptr, mask_u32, n);
ptr = ptr.offset(head as isize);
}
len -= n;
if cfg!(target_endian = "big") {
mask_u32.rotate_left(8 * n as u32)
} else {
mask_u32.rotate_right(8 * n as u32)
}
} else {
mask_u32.rotate_right(8 * head as u32)
mask_u32
};
if head > 4 {
unsafe {
*(ptr as *mut u32) ^= mask_u32;
ptr = ptr.offset(4);
len -= 4;
}
}
mask_u32
} else {
mask_u32
};
@@ -55,7 +62,20 @@ fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) {
}
// Properly aligned middle of the data.
while len > 4 {
if len >= 8 {
let mut mask_u64 = mask_u32 as u64;
mask_u64 = mask_u64 << 32 | mask_u32 as u64;
while len >= 8 {
unsafe {
*(ptr as *mut u64) ^= mask_u64;
ptr = ptr.offset(8);
len -= 8;
}
}
}
while len >= 4 {
unsafe {
*(ptr as *mut u32) ^= mask_u32;
ptr = ptr.offset(4);
@@ -83,6 +103,7 @@ unsafe fn xor_mem(ptr: *mut u8, mask: u32, len: usize) {
#[cfg(test)]
mod tests {
use std::ptr;
use super::{apply_mask_fallback, apply_mask_fast32};
#[test]
@@ -90,6 +111,8 @@ mod tests {
let mask = [
0x6d, 0xb6, 0xb2, 0x80,
];
let mask_u32: u32 = unsafe {ptr::read_unaligned(mask.as_ptr() as *const u32)};
let unmasked = vec![
0xf3, 0x00, 0x01, 0x02, 0x03, 0x80, 0x81, 0x82,
0xff, 0xfe, 0x00, 0x17, 0x74, 0xf9, 0x12, 0x03,
@@ -101,7 +124,7 @@ mod tests {
apply_mask_fallback(&mut masked, &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast, &mask);
apply_mask_fast32(&mut masked_fast, mask_u32);
assert_eq!(masked, masked_fast);
}
@@ -112,7 +135,7 @@ mod tests {
apply_mask_fallback(&mut masked[1..], &mask);
let mut masked_fast = unmasked.clone();
apply_mask_fast32(&mut masked_fast[1..], &mask);
apply_mask_fast32(&mut masked_fast[1..], mask_u32);
assert_eq!(masked, masked_fast);
}

View File

@@ -63,8 +63,8 @@ mod context;
mod mask;
mod client;
use self::frame::Frame;
use self::proto::{hash_key, OpCode};
pub use self::frame::Frame;
pub use self::proto::OpCode;
pub use self::proto::CloseCode;
pub use self::context::WebsocketContext;
pub use self::client::{Client, ClientError,
@@ -248,7 +248,7 @@ pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponseBuilder, Handsha
}
let key = {
let key = req.headers().get(header::SEC_WEBSOCKET_KEY).unwrap();
hash_key(key.as_ref())
proto::hash_key(key.as_ref())
};
Ok(HttpResponse::build(StatusCode::SWITCHING_PROTOCOLS)
@@ -304,7 +304,7 @@ impl<S> Stream for WsStream<S> where S: Stream<Item=Bytes, Error=PayloadError> {
}
match opcode {
OpCode::Continue => unimplemented!(),
OpCode::Continue => Err(ProtocolError::NoContinuation),
OpCode::Bad => {
self.closed = true;
Err(ProtocolError::BadOpCode)

View File

@@ -6,7 +6,7 @@ use base64;
use self::OpCode::*;
/// Operation codes as part of rfc6455.
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(crate) enum OpCode {
pub enum OpCode {
/// Indicates a continuation frame of a fragmented message.
Continue,
/// Indicates a text data frame.

View File

@@ -35,8 +35,9 @@ fn main() {
-s, --size=[NUMBER] 'size of PUBLISH packet payload to send in KB'
-w, --warm-up=[SECONDS] 'seconds before counter values are considered for reporting'
-r, --sample-rate=[SECONDS] 'seconds between average reports'
-c, --concurrency=[NUMBER] 'number of websockt connections to open and use concurrently for sending'
-t, --threads=[NUMBER] 'number of threads to use'",
-c, --concurrency=[NUMBER] 'number of websocket connections to open and use concurrently for sending'
-t, --threads=[NUMBER] 'number of threads to use'
--max-payload=[NUMBER] 'max size of payload before reconnect KB'",
)
.get_matches();
@@ -50,9 +51,13 @@ fn main() {
let threads = parse_u64_default(matches.value_of("threads"), num_cpus::get() as u64);
let concurrency = parse_u64_default(matches.value_of("concurrency"), 1);
let payload_size: usize = match matches.value_of("size") {
Some(s) => parse_u64_default(Some(s), 0) as usize * 1024,
Some(s) => parse_u64_default(Some(s), 1) as usize * 1024,
None => 1024,
};
let max_payload_size: usize = match matches.value_of("max-payload") {
Some(s) => parse_u64_default(Some(s), 0) as usize * 1024,
None => 0,
};
let warmup_seconds = parse_u64_default(matches.value_of("warm-up"), 2) as u64;
let sample_rate = parse_u64_default(matches.value_of("sample-rate"), 1) as usize;
@@ -64,7 +69,10 @@ fn main() {
let sys = actix::System::new("ws-client");
let mut report = true;
let _: () = Perf{counters: perf_counters.clone(),
payload: payload.len(),
sample_rate_secs: sample_rate}.start();
for t in 0..threads {
let pl = payload.clone();
let ws = ws_url.clone();
@@ -72,40 +80,41 @@ fn main() {
let addr = Arbiter::new(format!("test {}", t));
addr.do_send(actix::msgs::Execute::new(move || -> Result<(), ()> {
let mut reps = report;
for _ in 0..concurrency {
let pl2 = pl.clone();
let perf2 = perf.clone();
let ws2 = ws.clone();
Arbiter::handle().spawn(
ws::Client::new(&ws).connect()
ws::Client::new(&ws)
.write_buffer_capacity(0)
.connect()
.map_err(|e| {
println!("Error: {}", e);
Arbiter::system().do_send(actix::msgs::SystemExit(0));
//Arbiter::system().do_send(actix::msgs::SystemExit(0));
()
})
.map(move |(reader, writer)| {
let addr: Addr<Syn, _> = ChatClient::create(move |ctx| {
ChatClient::add_stream(reader, ctx);
ChatClient{conn: writer,
ChatClient{url: ws2,
conn: writer,
payload: pl2,
report: reps,
bin: bin,
ts: time::precise_time_ns(),
perf_counters: perf2,
sample_rate_secs: sample_rate,
sent: 0,
max_payload_size: max_payload_size,
}
});
})
);
reps = false;
}
Ok(())
}));
report = false;
}
let _ = sys.run();
let res = sys.run();
}
fn parse_u64_default(input: Option<&str>, default: u64) -> u64 {
@@ -113,43 +122,33 @@ fn parse_u64_default(input: Option<&str>, default: u64) -> u64 {
.unwrap_or(default)
}
struct ChatClient{
conn: ws::ClientWriter,
payload: Arc<String>,
ts: u64,
bin: bool,
report: bool,
perf_counters: Arc<PerfCounters>,
struct Perf {
counters: Arc<PerfCounters>,
payload: usize,
sample_rate_secs: usize,
}
impl Actor for ChatClient {
impl Actor for Perf {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
self.send_text();
if self.report {
self.sample_rate(ctx);
}
}
fn stopping(&mut self, _: &mut Context<Self>) -> Running {
Arbiter::system().do_send(actix::msgs::SystemExit(0));
Running::Stop
self.sample_rate(ctx);
}
}
impl ChatClient {
impl Perf {
fn sample_rate(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(self.sample_rate_secs as u64, 0), |act, ctx| {
let req_count = act.perf_counters.pull_request_count();
let req_count = act.counters.pull_request_count();
if req_count != 0 {
let latency = act.perf_counters.pull_latency_ns();
let latency_max = act.perf_counters.pull_latency_max_ns();
let conns = act.counters.pull_connections_count();
let latency = act.counters.pull_latency_ns();
let latency_max = act.counters.pull_latency_max_ns();
println!(
"rate: {}, throughput: {:?} kb, latency: {}, latency max: {}",
"rate: {}, conns: {}, throughput: {:?} kb, latency: {}, latency max: {}",
req_count / act.sample_rate_secs,
(((req_count * act.payload.len()) as f64) / 1024.0) /
conns / act.sample_rate_secs,
(((req_count * act.payload) as f64) / 1024.0) /
act.sample_rate_secs as f64,
time::Duration::nanoseconds((latency / req_count as u64) as i64),
time::Duration::nanoseconds(latency_max as i64)
@@ -159,13 +158,71 @@ impl ChatClient {
act.sample_rate(ctx);
});
}
}
fn send_text(&mut self) {
self.ts = time::precise_time_ns();
if self.bin {
self.conn.binary(&self.payload);
struct ChatClient{
url: String,
conn: ws::ClientWriter,
payload: Arc<String>,
ts: u64,
bin: bool,
perf_counters: Arc<PerfCounters>,
sent: usize,
max_payload_size: usize,
}
impl Actor for ChatClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
self.send_text();
self.perf_counters.register_connection();
}
}
impl ChatClient {
fn send_text(&mut self) -> bool {
self.sent += self.payload.len();
if self.max_payload_size > 0 && self.sent > self.max_payload_size {
let ws = self.url.clone();
let pl = self.payload.clone();
let bin = self.bin;
let perf_counters = self.perf_counters.clone();
let max_payload_size = self.max_payload_size;
Arbiter::handle().spawn(
ws::Client::new(&self.url).connect()
.map_err(|e| {
println!("Error: {}", e);
Arbiter::system().do_send(actix::msgs::SystemExit(0));
()
})
.map(move |(reader, writer)| {
let addr: Addr<Syn, _> = ChatClient::create(move |ctx| {
ChatClient::add_stream(reader, ctx);
ChatClient{url: ws,
conn: writer,
payload: pl,
bin: bin,
ts: time::precise_time_ns(),
perf_counters: perf_counters,
sent: 0,
max_payload_size: max_payload_size,
}
});
})
);
false
} else {
self.conn.text(&self.payload);
self.ts = time::precise_time_ns();
if self.bin {
self.conn.binary(&self.payload);
} else {
self.conn.text(&self.payload);
}
true
}
}
}
@@ -183,7 +240,9 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for ChatClient {
if txt == self.payload.as_ref().as_str() {
self.perf_counters.register_request();
self.perf_counters.register_latency(time::precise_time_ns() - self.ts);
self.send_text();
if !self.send_text() {
ctx.stop();
}
} else {
println!("not eaqual");
}
@@ -196,6 +255,7 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for ChatClient {
pub struct PerfCounters {
req: AtomicUsize,
conn: AtomicUsize,
lat: AtomicUsize,
lat_max: AtomicUsize
}
@@ -204,6 +264,7 @@ impl PerfCounters {
pub fn new() -> PerfCounters {
PerfCounters {
req: AtomicUsize::new(0),
conn: AtomicUsize::new(0),
lat: AtomicUsize::new(0),
lat_max: AtomicUsize::new(0),
}
@@ -213,6 +274,10 @@ impl PerfCounters {
self.req.swap(0, Ordering::SeqCst)
}
pub fn pull_connections_count(&self) -> usize {
self.conn.swap(0, Ordering::SeqCst)
}
pub fn pull_latency_ns(&self) -> u64 {
self.lat.swap(0, Ordering::SeqCst) as u64
}
@@ -225,6 +290,10 @@ impl PerfCounters {
self.req.fetch_add(1, Ordering::SeqCst);
}
pub fn register_connection(&self) {
self.conn.fetch_add(1, Ordering::SeqCst);
}
pub fn register_latency(&self, nanos: u64) {
let nanos = nanos as usize;
self.lat.fetch_add(nanos, Ordering::SeqCst);