From 2ee8f45f5dc66486d071b13b1acc17c150568823 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 28 Dec 2020 11:16:37 +0800 Subject: [PATCH] update actix-codec and actix-utils to tokio 1.0 (#237) --- Cargo.toml | 6 +++--- actix-codec/CHANGES.md | 5 ++++- actix-codec/Cargo.toml | 12 ++++++------ actix-codec/src/bcodec.rs | 2 +- actix-codec/src/framed.rs | 37 +++++++++++++++++++------------------ actix-codec/src/lib.rs | 3 ++- actix-utils/CHANGES.md | 1 + actix-utils/Cargo.toml | 4 ++-- actix-utils/src/timeout.rs | 29 +++++++++++++++-------------- 9 files changed, 53 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 61280183..d46b6283 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,16 +16,16 @@ members = [ ] [patch.crates-io] -actix-codec = { path = "actix-codec" } +actix-codec = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } actix-connect = { path = "actix-connect" } actix-rt = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } actix-macros = { path = "actix-macros" } actix-server = { path = "actix-server" } -actix-service = { path = "actix-service" } +actix-service = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } actix-testing = { path = "actix-testing" } actix-threadpool = { path = "actix-threadpool" } actix-tls = { path = "actix-tls" } actix-tracing = { path = "actix-tracing" } -actix-utils = { path = "actix-utils" } +actix-utils = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } actix-router = { path = "router" } bytestring = { path = "string" } diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index cd4424d0..3d82775a 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -1,7 +1,10 @@ # Changes ## Unreleased - 2020-xx-xx -* Upgrade `pin-project` to `1.0`. +* Replace `pin-project` with `pin-project-lite`. +* Upgrade `tokio` dependency to `1`. +* Upgrade `tokio-util` dependency to `0.6`. +* Upgrade `bytes` dependency to `1`. ## 0.3.0 - 2020-08-23 * No changes from beta 2. diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 1214945a..e901efd5 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -17,10 +17,10 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2.1" -bytes = "0.5.2" -futures-core = { version = "0.3.4", default-features = false } -futures-sink = { version = "0.3.4", default-features = false } +bytes = "1" +futures-core = { version = "0.3.7", default-features = false } +futures-sink = { version = "0.3.7", default-features = false } log = "0.4" -pin-project = "1.0.0" -tokio = { version = "0.2.5", default-features = false } -tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] } +pin-project-lite = "0.2" +tokio = "1" +tokio-util = { version = "0.6", features = ["codec", "io"] } diff --git a/actix-codec/src/bcodec.rs b/actix-codec/src/bcodec.rs index 045b20a2..b06279ea 100644 --- a/actix-codec/src/bcodec.rs +++ b/actix-codec/src/bcodec.rs @@ -14,7 +14,7 @@ impl Encoder for BytesCodec { #[inline] fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.extend_from_slice(item.bytes()); + dst.extend_from_slice(item.chunk()); Ok(()) } } diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 844f20d8..cf2297dc 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -5,7 +5,6 @@ use std::{fmt, io}; use bytes::{Buf, BytesMut}; use futures_core::{ready, Stream}; use futures_sink::Sink; -use pin_project::pin_project; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; @@ -21,22 +20,23 @@ bitflags::bitflags! { } } -/// A unified `Stream` and `Sink` interface to an underlying I/O object, using -/// the `Encoder` and `Decoder` traits to encode and decode frames. -/// -/// Raw I/O objects work with byte sequences, but higher-level code usually -/// wants to batch these into meaningful chunks, called "frames". This -/// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder` -/// traits to handle encoding and decoding of message frames. Note that -/// the incoming and outgoing frame types may be distinct. -#[pin_project] -pub struct Framed { - #[pin] - io: T, - codec: U, - flags: Flags, - read_buf: BytesMut, - write_buf: BytesMut, +pin_project_lite::pin_project! { + /// A unified `Stream` and `Sink` interface to an underlying I/O object, using + /// the `Encoder` and `Decoder` traits to encode and decode frames. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder` + /// traits to handle encoding and decoding of message frames. Note that + /// the incoming and outgoing frame types may be distinct. + pub struct Framed { + #[pin] + io: T, + codec: U, + flags: Flags, + read_buf: BytesMut, + write_buf: BytesMut, + } } impl Framed @@ -220,7 +220,8 @@ impl Framed { if remaining < LW { this.read_buf.reserve(HW - remaining) } - let cnt = match this.io.poll_read_buf(cx, &mut this.read_buf) { + + let cnt = match tokio_util::io::poll_read_buf(this.io, cx, this.read_buf) { Poll::Pending => return Poll::Pending, Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), Poll::Ready(Ok(cnt)) => cnt, diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index 8c346052..9e875409 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -18,5 +18,6 @@ mod framed; pub use self::bcodec::BytesCodec; pub use self::framed::{Framed, FramedParts}; -pub use tokio::io::{AsyncRead, AsyncWrite}; +pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub use tokio_util::codec::{Decoder, Encoder}; +pub use tokio_util::io::poll_read_buf; diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index b4d59ed0..b112d8b1 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,6 +1,7 @@ # Changes ## Unreleased - 2020-xx-xx +* Update `bytes` dependency to `1`. * Use `pin-project-lite` to replace `pin-project`. [#229] * Remove `condition`,`either`,`inflight`,`keepalive`,`oneshot`,`order`,`stream` and `time` mods. [#229] diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index fb7ed151..3ed4a518 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -17,8 +17,8 @@ path = "src/lib.rs" [dependencies] actix-codec = "0.3.0" -actix-rt = "1.1.1" -actix-service = "1.0.6" +actix-rt = "2.0.0-beta.1" +actix-service = "2.0.0-beta.1" futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index a27e9ffb..612c3cb4 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -9,7 +9,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use core::{fmt, time}; -use actix_rt::time::{delay_for, Delay}; +use actix_rt::time::{sleep, Sleep}; use actix_service::{IntoService, Service, Transform}; use pin_project_lite::pin_project; @@ -85,8 +85,8 @@ where { type Response = S::Response; type Error = TimeoutError; - type InitError = E; type Transform = TimeoutService; + type InitError = E; type Future = TimeoutFuture; fn new_transform(&self, service: S) -> Self::Future { @@ -157,7 +157,7 @@ where fn call(&mut self, request: Req) -> Self::Future { TimeoutServiceResponse { fut: self.service.call(request), - sleep: delay_for(self.timeout), + sleep: sleep(self.timeout), } } } @@ -171,7 +171,8 @@ pin_project! { { #[pin] fut: S::Future, - sleep: Delay, + #[pin] + sleep: Sleep, } } @@ -193,20 +194,18 @@ where } // Now check the sleep - Pin::new(this.sleep) - .poll(cx) - .map(|_| Err(TimeoutError::Timeout)) + this.sleep.poll(cx).map(|_| Err(TimeoutError::Timeout)) } } #[cfg(test)] mod tests { - use std::task::Poll; - use std::time::Duration; + use core::task::Poll; + use core::time::Duration; use super::*; use actix_service::{apply, fn_factory, Service, ServiceFactory}; - use futures_util::future::{ok, FutureExt, LocalBoxFuture}; + use futures_core::future::LocalBoxFuture; struct SleepService(Duration); @@ -218,9 +217,11 @@ mod tests { actix_service::always_ready!(); fn call(&mut self, _: ()) -> Self::Future { - actix_rt::time::delay_for(self.0) - .then(|_| ok::<_, ()>(())) - .boxed_local() + let sleep = actix_rt::time::sleep(self.0); + Box::pin(async move { + sleep.await; + Ok(()) + }) } } @@ -249,7 +250,7 @@ mod tests { let timeout = apply( Timeout::new(resolution), - fn_factory(|| ok::<_, ()>(SleepService(wait_time))), + fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }), ); let mut srv = timeout.new_service(&()).await.unwrap();