diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index fd893454..736db50d 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* Added `LinesCodec.` [#338] + +[#338]: https://github.com/actix/actix-net/pull/338 ## 0.4.0 - 2021-04-20 diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 7bf1c941..52456405 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "actix-codec" version = "0.4.0" -authors = ["Nikolay Kim "] +authors = [ + "Nikolay Kim ", + "Rob Ede ", +] description = "Codec utilities for working with framed protocols" keywords = ["network", "framework", "async", "futures"] repository = "https://github.com/actix/actix-net" @@ -19,6 +22,14 @@ bytes = "1" futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } log = "0.4" +memchr = "2.3" pin-project-lite = "0.2" tokio = "1.5.1" tokio-util = { version = "0.6", features = ["codec", "io"] } + +[dev-dependencies] +criterion = { version = "0.3", features = ["html_reports"] } + +[[bench]] +name = "lines" +harness = false diff --git a/actix-codec/benches/lines.rs b/actix-codec/benches/lines.rs new file mode 100644 index 00000000..e32b8365 --- /dev/null +++ b/actix-codec/benches/lines.rs @@ -0,0 +1,57 @@ +use bytes::BytesMut; +use criterion::{criterion_group, criterion_main, Criterion}; + +const INPUT: &[u8] = include_bytes!("./lorem.txt"); + +fn bench_lines_codec(c: &mut Criterion) { + let mut decode_group = c.benchmark_group("lines decode"); + + decode_group.bench_function("actix", |b| { + b.iter(|| { + use actix_codec::Decoder as _; + + let mut codec = actix_codec::LinesCodec::default(); + let mut buf = BytesMut::from(INPUT); + while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {} + }); + }); + + decode_group.bench_function("tokio", |b| { + b.iter(|| { + use tokio_util::codec::Decoder as _; + + let mut codec = tokio_util::codec::LinesCodec::new(); + let mut buf = BytesMut::from(INPUT); + while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {} + }); + }); + + decode_group.finish(); + + let mut encode_group = c.benchmark_group("lines encode"); + + encode_group.bench_function("actix", |b| { + b.iter(|| { + use actix_codec::Encoder as _; + + let mut codec = actix_codec::LinesCodec::default(); + let mut buf = BytesMut::new(); + codec.encode("123", &mut buf).unwrap(); + }); + }); + + encode_group.bench_function("tokio", |b| { + b.iter(|| { + use tokio_util::codec::Encoder as _; + + let mut codec = tokio_util::codec::LinesCodec::new(); + let mut buf = BytesMut::new(); + codec.encode("123", &mut buf).unwrap(); + }); + }); + + encode_group.finish(); +} + +criterion_group!(benches, bench_lines_codec); +criterion_main!(benches); diff --git a/actix-codec/benches/lorem.txt b/actix-codec/benches/lorem.txt new file mode 100644 index 00000000..108b3c46 --- /dev/null +++ b/actix-codec/benches/lorem.txt @@ -0,0 +1,5 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit. In tortor quam, pulvinar sit amet vestibulum eget, tincidunt non urna. Sed eu sem in felis malesuada venenatis. Suspendisse volutpat aliquet nisi, in condimentum nibh convallis id. Quisque gravida felis scelerisque ipsum aliquam consequat. Praesent libero odio, malesuada vitae odio quis, aliquam aliquet enim. In fringilla ut turpis nec pharetra. Duis eu posuere metus. Sed a aliquet massa. Mauris non tempus mi, quis mattis libero. Vivamus ornare ex at semper cursus. Vestibulum sed facilisis erat, aliquet mollis est. In interdum, magna iaculis ultricies elementum, mi ante vestibulum mauris, nec viverra turpis lorem quis ante. Proin in auctor erat. Vivamus dictum congue massa, fermentum bibendum leo pretium quis. Integer dapibus sodales ligula, sit amet imperdiet felis suscipit eu. Phasellus non ornare enim. +Nam feugiat neque sit amet hendrerit rhoncus. Nunc suscipit molestie vehicula. Aenean vulputate porttitor augue, sit amet molestie dolor volutpat vitae. Nulla vitae condimentum eros. Aliquam tristique purus at metus lacinia egestas. Cras euismod lorem eu orci lobortis, sed tincidunt nisl laoreet. Ut suscipit fermentum mi, et euismod tortor. Pellentesque vitae tempor quam, sed dignissim mi. Suspendisse luctus lacus vitae ligula blandit vehicula. Quisque interdum iaculis tincidunt. Nunc elementum mi vitae tempor placerat. Suspendisse potenti. Donec blandit laoreet ipsum, quis rhoncus velit vulputate sed. +Aliquam suscipit lectus eros, at maximus dolor efficitur quis. Integer blandit tortor orci, nec mattis nunc eleifend ac. Mauris pharetra vel quam quis lacinia. Duis lobortis condimentum nunc ut facilisis. Praesent arcu nisi, porta sit amet viverra sit amet, pellentesque ut nisi. Nunc gravida tortor eu ligula tempus, in interdum magna pretium. Fusce eu ornare sapien. Nullam pellentesque cursus eros. Nam orci massa, faucibus eget leo eget, elementum vulputate erat. Fusce vehicula augue et dui hendrerit vulputate. Mauris neque lacus, porttitor ut condimentum id, efficitur ac neque. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Donec accumsan, lectus fermentum elementum tristique, ipsum tortor mollis ante, non lacinia nibh ex quis sapien. +Donec pharetra, elit eget rutrum luctus, urna ligula facilisis lorem, sit amet rhoncus ante est eu mi. Vestibulum vestibulum ultricies interdum. Nulla tincidunt ante non hendrerit venenatis. Curabitur vestibulum turpis erat, id efficitur quam venenatis eu. Fusce nulla sem, dapibus vel quam feugiat, ornare fermentum ligula. Praesent tempus tincidunt mauris, non pellentesque felis varius in. Aenean eu arcu ligula. Morbi dapibus maximus nulla a pharetra. Fusce leo metus, luctus ut cursus non, sollicitudin non lectus. Integer pellentesque eleifend erat, vel gravida purus tempus a. Mauris id vestibulum quam. Nunc vitae ullamcorper metus, pharetra placerat enim. Fusce in ultrices nisl. Curabitur justo mauris, dignissim in aliquam sit amet, sollicitudin ut risus. Cras tempor rutrum justo, non tincidunt est maximus at. +Aliquam ac velit tincidunt, ullamcorper velit sit amet, pulvinar nisi. Nullam rhoncus rhoncus egestas. Cras ac luctus nisi. Mauris sit amet risus at magna volutpat ultrices quis ac dui. Aliquam condimentum tellus purus, vel sagittis odio vulputate at. Sed ut finibus tellus. Aliquam tincidunt vehicula diam. diff --git a/actix-codec/src/bcodec.rs b/actix-codec/src/bcodec.rs index b06279ea..ca015b33 100644 --- a/actix-codec/src/bcodec.rs +++ b/actix-codec/src/bcodec.rs @@ -1,11 +1,10 @@ -use bytes::{Buf, Bytes, BytesMut}; use std::io; +use bytes::{Buf, Bytes, BytesMut}; + use super::{Decoder, Encoder}; -/// Bytes codec. -/// -/// Reads/Writes chunks of bytes from a stream. +/// Bytes codec. Reads/writes chunks of bytes from a stream. #[derive(Debug, Copy, Clone)] pub struct BytesCodec; diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index c7713bfe..5842fa7b 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -14,9 +14,11 @@ mod bcodec; mod framed; +mod lines; pub use self::bcodec::BytesCodec; pub use self::framed::{Framed, FramedParts}; +pub use self::lines::LinesCodec; pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub use tokio_util::codec::{Decoder, Encoder}; diff --git a/actix-codec/src/lines.rs b/actix-codec/src/lines.rs new file mode 100644 index 00000000..af399e8f --- /dev/null +++ b/actix-codec/src/lines.rs @@ -0,0 +1,158 @@ +use std::io; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use memchr::memchr; + +use super::{Decoder, Encoder}; + +/// Lines codec. Reads/writes line delimited strings. +/// +/// Will split input up by LF or CRLF delimiters. I.e. carriage return characters at the end of +/// lines are not preserved. +#[derive(Debug, Copy, Clone, Default)] +#[non_exhaustive] +pub struct LinesCodec; + +impl> Encoder for LinesCodec { + type Error = io::Error; + + #[inline] + fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> { + let item = item.as_ref(); + dst.reserve(item.len() + 1); + dst.put_slice(item.as_bytes()); + dst.put_u8(b'\n'); + Ok(()) + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.is_empty() { + return Ok(None); + } + + let len = match memchr(b'\n', src) { + Some(n) => n, + None => { + return Ok(None); + } + }; + + // split up to new line char + let mut buf = src.split_to(len); + debug_assert_eq!(len, buf.len()); + + // remove new line char from source + src.advance(1); + + match buf.last() { + // remove carriage returns at the end of buf + Some(b'\r') => buf.truncate(len - 1), + + // line is empty + None => return Ok(Some(String::new())), + + _ => {} + } + + try_into_utf8(buf.freeze()) + } + + fn decode_eof(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self.decode(src)? { + Some(frame) => Ok(Some(frame)), + None if src.is_empty() => Ok(None), + None => { + let buf = match src.last() { + // if last line ends in a CR then take everything up to it + Some(b'\r') => src.split_to(src.len() - 1), + + // take all bytes from source + _ => src.split(), + }; + + if buf.is_empty() { + return Ok(None); + } + + try_into_utf8(buf.freeze()) + } + } + } +} + +// Attempts to convert bytes into a `String`. +fn try_into_utf8(buf: Bytes) -> io::Result> { + String::from_utf8(buf.to_vec()) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err)) + .map(Some) +} + +#[cfg(test)] +mod tests { + use bytes::BufMut as _; + + use super::*; + + #[test] + fn lines_decoder() { + let mut codec = LinesCodec::default(); + let mut buf = BytesMut::from("\nline 1\nline 2\r\nline 3\n\r\n\r"); + + assert_eq!("", codec.decode(&mut buf).unwrap().unwrap()); + assert_eq!("line 1", codec.decode(&mut buf).unwrap().unwrap()); + assert_eq!("line 2", codec.decode(&mut buf).unwrap().unwrap()); + assert_eq!("line 3", codec.decode(&mut buf).unwrap().unwrap()); + assert_eq!("", codec.decode(&mut buf).unwrap().unwrap()); + assert!(codec.decode(&mut buf).unwrap().is_none()); + assert!(codec.decode_eof(&mut buf).unwrap().is_none()); + + buf.put_slice(b"k"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + assert_eq!("\rk", codec.decode_eof(&mut buf).unwrap().unwrap()); + + assert!(codec.decode(&mut buf).unwrap().is_none()); + assert!(codec.decode_eof(&mut buf).unwrap().is_none()); + } + + #[test] + fn lines_encoder() { + let mut codec = LinesCodec::default(); + + let mut buf = BytesMut::new(); + + codec.encode("", &mut buf).unwrap(); + assert_eq!(&buf[..], b"\n"); + + codec.encode("test", &mut buf).unwrap(); + assert_eq!(&buf[..], b"\ntest\n"); + + codec.encode("a\nb", &mut buf).unwrap(); + assert_eq!(&buf[..], b"\ntest\na\nb\n"); + } + + #[test] + fn lines_encoder_no_overflow() { + let mut codec = LinesCodec::default(); + + let mut buf = BytesMut::new(); + codec.encode("1234567", &mut buf).unwrap(); + assert_eq!(&buf[..], b"1234567\n"); + + let mut buf = BytesMut::new(); + codec.encode("12345678", &mut buf).unwrap(); + assert_eq!(&buf[..], b"12345678\n"); + + let mut buf = BytesMut::new(); + codec.encode("123456789111213", &mut buf).unwrap(); + assert_eq!(&buf[..], b"123456789111213\n"); + + let mut buf = BytesMut::new(); + codec.encode("1234567891112131", &mut buf).unwrap(); + assert_eq!(&buf[..], b"1234567891112131\n"); + } +}