From ebdc983dfe34882e07addde9eb438a55f5e3bbce Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 8 Mar 2018 17:19:50 -0800 Subject: [PATCH] optimize websocket stream --- src/client/parser.rs | 1 - src/multipart.rs | 4 +-- src/payload.rs | 83 ++++++++++++++++++++++++++++++++++++-------- src/server/shared.rs | 4 +-- src/ws/frame.rs | 21 +++++++---- src/ws/mask.rs | 45 ++++++++++++++++++------ src/ws/mod.rs | 2 +- 7 files changed, 123 insertions(+), 37 deletions(-) diff --git a/src/client/parser.rs b/src/client/parser.rs index 3952ed3b7..8fe399009 100644 --- a/src/client/parser.rs +++ b/src/client/parser.rs @@ -78,7 +78,6 @@ impl HttpResponseParser { -> Poll, PayloadError> where T: IoStream { - println!("PARSE payload, {:?}", self.decoder.is_some()); if self.decoder.is_some() { loop { // read payload diff --git a/src/multipart.rs b/src/multipart.rs index f70c78a78..898a7f194 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -482,10 +482,10 @@ impl InnerField where S: Stream { if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" && &chunk[4..] == boundary.as_bytes() { - payload.unread_data(chunk.freeze()); + payload.unread_data(chunk); Ok(Async::Ready(None)) } else { - Ok(Async::Ready(Some(chunk.freeze()))) + Ok(Async::Ready(Some(chunk))) } } } diff --git a/src/payload.rs b/src/payload.rs index 3cefcf718..5fd960976 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -117,18 +117,21 @@ pub struct PayloadSender { impl PayloadWriter for PayloadSender { + #[inline] fn set_error(&mut self, err: PayloadError) { if let Some(shared) = self.inner.upgrade() { shared.borrow_mut().set_error(err) } } + #[inline] fn feed_eof(&mut self) { if let Some(shared) = self.inner.upgrade() { shared.borrow_mut().feed_eof() } } + #[inline] fn feed_data(&mut self, data: Bytes) { if let Some(shared) = self.inner.upgrade() { shared.borrow_mut().feed_data(data) @@ -172,24 +175,29 @@ impl Inner { } } + #[inline] fn set_error(&mut self, err: PayloadError) { self.err = Some(err); } + #[inline] fn feed_eof(&mut self) { self.eof = true; } + #[inline] fn feed_data(&mut self, data: Bytes) { self.len += data.len(); self.need_read = false; self.items.push_back(data); } + #[inline] fn eof(&self) -> bool { self.items.is_empty() && self.eof } + #[inline] fn len(&self) -> usize { self.len } @@ -247,6 +255,7 @@ impl PayloadHelper where S: Stream { } } + #[inline] fn poll_stream(&mut self) -> Poll { self.stream.poll().map(|res| { match res { @@ -261,6 +270,7 @@ impl PayloadHelper where S: Stream { }) } + #[inline] pub fn readany(&mut self) -> Poll, PayloadError> { if let Some(data) = self.items.pop_front() { self.len -= data.len(); @@ -274,25 +284,70 @@ impl PayloadHelper where S: Stream { } } - pub fn readexactly(&mut self, size: usize) -> Poll, PayloadError> { + #[inline] + pub fn can_read(&mut self, size: usize) -> Poll, PayloadError> { if size <= self.len { - let mut buf = BytesMut::with_capacity(size); - while buf.len() < size { + Ok(Async::Ready(Some(true))) + } else { + match self.poll_stream()? { + Async::Ready(true) => self.can_read(size), + Async::Ready(false) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } + } + + #[inline] + pub fn readexactly(&mut self, size: usize) -> Poll, PayloadError> { + if size <= self.len { + self.len -= size; + let mut chunk = self.items.pop_front().unwrap(); + if size < chunk.len() { + let buf = chunk.split_to(size); + self.items.push_front(chunk); + Ok(Async::Ready(Some(buf))) + } + else if size == chunk.len() { + Ok(Async::Ready(Some(chunk))) + } + else { + let mut buf = BytesMut::with_capacity(size); + buf.extend_from_slice(&chunk); + + while buf.len() < size { + let mut chunk = self.items.pop_front().unwrap(); + let rem = cmp::min(size - buf.len(), chunk.len()); + buf.extend_from_slice(&chunk.split_to(rem)); + if !chunk.is_empty() { + self.items.push_front(chunk); + } + } + Ok(Async::Ready(Some(buf.freeze()))) + } + } else { + match self.poll_stream()? { + Async::Ready(true) => self.readexactly(size), + Async::Ready(false) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } + } + + #[inline] + pub fn drop_payload(&mut self, size: usize) { + if size <= self.len { + self.len -= size; + + let mut len = 0; + while len < size { let mut chunk = self.items.pop_front().unwrap(); - let rem = cmp::min(size - buf.len(), chunk.len()); - self.len -= rem; - buf.extend_from_slice(&chunk.split_to(rem)); - if !chunk.is_empty() { + let rem = cmp::min(size - len, chunk.len()); + len -= rem; + if rem < chunk.len() { + chunk.split_to(rem); self.items.push_front(chunk); } } - return Ok(Async::Ready(Some(buf))) - } - - match self.poll_stream()? { - Async::Ready(true) => self.readexactly(size), - Async::Ready(false) => Ok(Async::Ready(None)), - Async::NotReady => Ok(Async::NotReady), } } diff --git a/src/server/shared.rs b/src/server/shared.rs index f39a31605..ed87ca07c 100644 --- a/src/server/shared.rs +++ b/src/server/shared.rs @@ -27,7 +27,7 @@ impl SharedBytesPool { pub fn release_bytes(&self, mut bytes: Rc) { let v = &mut self.0.borrow_mut(); if v.len() < 128 { - Rc::get_mut(&mut bytes).unwrap().take(); + Rc::get_mut(&mut bytes).unwrap().clear(); v.push_front(bytes); } } @@ -62,7 +62,7 @@ impl SharedBytes { #[inline(always)] #[allow(mutable_transmutes)] #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] - pub fn get_mut(&self) -> &mut BytesMut { + pub(crate) fn get_mut(&self) -> &mut BytesMut { let r: &BytesMut = self.0.as_ref().unwrap().as_ref(); unsafe{mem::transmute(r)} } diff --git a/src/ws/frame.rs b/src/ws/frame.rs index 96162b5c6..c4841ce7e 100644 --- a/src/ws/frame.rs +++ b/src/ws/frame.rs @@ -122,14 +122,21 @@ impl Frame { None }; - let mut data = match pl.readexactly(idx + length)? { - Async::Ready(Some(buf)) => buf, + match pl.can_read(idx + length)? { + Async::Ready(Some(true)) => (), Async::Ready(None) => return Ok(Async::Ready(None)), - Async::NotReady => return Ok(Async::NotReady), - }; + Async::Ready(Some(false)) | Async::NotReady => return Ok(Async::NotReady), + } + + // remove prefix + pl.drop_payload(idx); // get body - data.split_to(idx); + let data = match pl.readexactly(length)? { + Async::Ready(Some(buf)) => buf, + Async::Ready(None) => return Ok(Async::Ready(None)), + Async::NotReady => panic!(), + }; // Disallow bad opcode if let OpCode::Bad = opcode { @@ -150,7 +157,9 @@ impl Frame { // unmask if let Some(ref mask) = mask { - apply_mask(&mut data, mask); + #[allow(mutable_transmutes)] + let p: &mut [u8] = unsafe{let ptr: &[u8] = &data; mem::transmute(ptr)}; + apply_mask(p, mask); } Ok(Async::Ready(Some(Frame { diff --git a/src/ws/mask.rs b/src/ws/mask.rs index 6c294b567..2e5a2960e 100644 --- a/src/ws/mask.rs +++ b/src/ws/mask.rs @@ -2,6 +2,7 @@ use std::cmp::min; use std::mem::uninitialized; use std::ptr::copy_nonoverlapping; +use std::ptr; /// Mask/unmask a frame. #[inline] @@ -18,17 +19,10 @@ fn apply_mask_fallback(buf: &mut [u8], mask: &[u8; 4]) { } } -/// Faster version of `apply_mask()` which operates on 4-byte blocks. +/// Faster version of `apply_mask()` which operates on 8-byte blocks. #[inline] -#[allow(dead_code)] fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) { - // TODO replace this with read_unaligned() as it stabilizes. - let mask_u32 = unsafe { - let mut m: u32 = uninitialized(); - #[allow(trivial_casts)] - copy_nonoverlapping(mask.as_ptr(), &mut m as *mut _ as *mut u8, 4); - m - }; + let mask_u32: u32 = unsafe {ptr::read_unaligned(mask.as_ptr() as *const u32)}; let mut ptr = buf.as_mut_ptr(); let mut len = buf.len(); @@ -41,10 +35,26 @@ fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) { ptr = ptr.offset(head as isize); } len -= head; - if cfg!(target_endian = "big") { + let mask_u32 = if cfg!(target_endian = "big") { mask_u32.rotate_left(8 * head as u32) } else { mask_u32.rotate_right(8 * head as u32) + }; + + let head = min(len, (4 - (ptr as usize & 3)) & 3); + if head > 0 { + unsafe { + xor_mem(ptr, mask_u32, head); + ptr = ptr.offset(head as isize); + } + len -= head; + if cfg!(target_endian = "big") { + mask_u32.rotate_left(8 * head as u32) + } else { + mask_u32.rotate_right(8 * head as u32) + } + } else { + mask_u32 } } else { mask_u32 @@ -55,7 +65,20 @@ fn apply_mask_fast32(buf: &mut [u8], mask: &[u8; 4]) { } // Properly aligned middle of the data. - while len > 4 { + if len >= 8 { + let mut mask_u64 = mask_u32 as u64; + mask_u64 = mask_u64 << 32 | mask_u32 as u64; + + while len >= 8 { + unsafe { + *(ptr as *mut u64) ^= mask_u64; + ptr = ptr.offset(8); + len -= 8; + } + } + } + + while len >= 4 { unsafe { *(ptr as *mut u32) ^= mask_u32; ptr = ptr.offset(4); diff --git a/src/ws/mod.rs b/src/ws/mod.rs index bf31189ca..6e540527a 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -304,7 +304,7 @@ impl Stream for WsStream where S: Stream { } match opcode { - OpCode::Continue => unimplemented!(), + OpCode::Continue => Err(ProtocolError::NoContinuation), OpCode::Bad => { self.closed = true; Err(ProtocolError::BadOpCode)