1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-23 22:51:07 +01:00

update actix-codec and actix-utils to tokio 1.0 (#237)

This commit is contained in:
fakeshadow 2020-12-28 11:16:37 +08:00 committed by GitHub
parent f48e3f4cb0
commit 2ee8f45f5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 53 additions and 46 deletions

View File

@ -16,16 +16,16 @@ members = [
] ]
[patch.crates-io] [patch.crates-io]
actix-codec = { path = "actix-codec" } actix-codec = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" }
actix-connect = { path = "actix-connect" } actix-connect = { path = "actix-connect" }
actix-rt = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } actix-rt = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" }
actix-macros = { path = "actix-macros" } actix-macros = { path = "actix-macros" }
actix-server = { path = "actix-server" } actix-server = { path = "actix-server" }
actix-service = { path = "actix-service" } actix-service = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" }
actix-testing = { path = "actix-testing" } actix-testing = { path = "actix-testing" }
actix-threadpool = { path = "actix-threadpool" } actix-threadpool = { path = "actix-threadpool" }
actix-tls = { path = "actix-tls" } actix-tls = { path = "actix-tls" }
actix-tracing = { path = "actix-tracing" } actix-tracing = { path = "actix-tracing" }
actix-utils = { path = "actix-utils" } actix-utils = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" }
actix-router = { path = "router" } actix-router = { path = "router" }
bytestring = { path = "string" } bytestring = { path = "string" }

View File

@ -1,7 +1,10 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
* Upgrade `pin-project` to `1.0`. * Replace `pin-project` with `pin-project-lite`.
* Upgrade `tokio` dependency to `1`.
* Upgrade `tokio-util` dependency to `0.6`.
* Upgrade `bytes` dependency to `1`.
## 0.3.0 - 2020-08-23 ## 0.3.0 - 2020-08-23
* No changes from beta 2. * No changes from beta 2.

View File

@ -17,10 +17,10 @@ path = "src/lib.rs"
[dependencies] [dependencies]
bitflags = "1.2.1" bitflags = "1.2.1"
bytes = "0.5.2" bytes = "1"
futures-core = { version = "0.3.4", default-features = false } futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.7", default-features = false }
log = "0.4" log = "0.4"
pin-project = "1.0.0" pin-project-lite = "0.2"
tokio = { version = "0.2.5", default-features = false } tokio = "1"
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] } tokio-util = { version = "0.6", features = ["codec", "io"] }

View File

@ -14,7 +14,7 @@ impl Encoder<Bytes> for BytesCodec {
#[inline] #[inline]
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.extend_from_slice(item.bytes()); dst.extend_from_slice(item.chunk());
Ok(()) Ok(())
} }
} }

View File

@ -5,7 +5,6 @@ use std::{fmt, io};
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use futures_sink::Sink; use futures_sink::Sink;
use pin_project::pin_project;
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};
@ -21,22 +20,23 @@ bitflags::bitflags! {
} }
} }
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using pin_project_lite::pin_project! {
/// the `Encoder` and `Decoder` traits to encode and decode frames. /// A unified `Stream` and `Sink` interface to an underlying I/O object, using
/// /// the `Encoder` and `Decoder` traits to encode and decode frames.
/// Raw I/O objects work with byte sequences, but higher-level code usually ///
/// wants to batch these into meaningful chunks, called "frames". This /// Raw I/O objects work with byte sequences, but higher-level code usually
/// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder` /// wants to batch these into meaningful chunks, called "frames". This
/// traits to handle encoding and decoding of message frames. Note that /// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder`
/// the incoming and outgoing frame types may be distinct. /// traits to handle encoding and decoding of message frames. Note that
#[pin_project] /// the incoming and outgoing frame types may be distinct.
pub struct Framed<T, U> { pub struct Framed<T, U> {
#[pin] #[pin]
io: T, io: T,
codec: U, codec: U,
flags: Flags, flags: Flags,
read_buf: BytesMut, read_buf: BytesMut,
write_buf: BytesMut, write_buf: BytesMut,
}
} }
impl<T, U> Framed<T, U> impl<T, U> Framed<T, U>
@ -220,7 +220,8 @@ impl<T, U> Framed<T, U> {
if remaining < LW { if remaining < LW {
this.read_buf.reserve(HW - remaining) this.read_buf.reserve(HW - remaining)
} }
let cnt = match this.io.poll_read_buf(cx, &mut this.read_buf) {
let cnt = match tokio_util::io::poll_read_buf(this.io, cx, this.read_buf) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(cnt)) => cnt, Poll::Ready(Ok(cnt)) => cnt,

View File

@ -18,5 +18,6 @@ mod framed;
pub use self::bcodec::BytesCodec; pub use self::bcodec::BytesCodec;
pub use self::framed::{Framed, FramedParts}; pub use self::framed::{Framed, FramedParts};
pub use tokio::io::{AsyncRead, AsyncWrite}; pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pub use tokio_util::codec::{Decoder, Encoder}; pub use tokio_util::codec::{Decoder, Encoder};
pub use tokio_util::io::poll_read_buf;

View File

@ -1,6 +1,7 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
* Update `bytes` dependency to `1`.
* Use `pin-project-lite` to replace `pin-project`. [#229] * Use `pin-project-lite` to replace `pin-project`. [#229]
* Remove `condition`,`either`,`inflight`,`keepalive`,`oneshot`,`order`,`stream` and `time` mods. [#229] * Remove `condition`,`either`,`inflight`,`keepalive`,`oneshot`,`order`,`stream` and `time` mods. [#229]

View File

@ -17,8 +17,8 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-codec = "0.3.0" actix-codec = "0.3.0"
actix-rt = "1.1.1" actix-rt = "2.0.0-beta.1"
actix-service = "1.0.6" actix-service = "2.0.0-beta.1"
futures-core = { version = "0.3.7", default-features = false } futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false }

View File

@ -9,7 +9,7 @@ use core::pin::Pin;
use core::task::{Context, Poll}; use core::task::{Context, Poll};
use core::{fmt, time}; use core::{fmt, time};
use actix_rt::time::{delay_for, Delay}; use actix_rt::time::{sleep, Sleep};
use actix_service::{IntoService, Service, Transform}; use actix_service::{IntoService, Service, Transform};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -85,8 +85,8 @@ where
{ {
type Response = S::Response; type Response = S::Response;
type Error = TimeoutError<S::Error>; type Error = TimeoutError<S::Error>;
type InitError = E;
type Transform = TimeoutService<S, Req>; type Transform = TimeoutService<S, Req>;
type InitError = E;
type Future = TimeoutFuture<Self::Transform, Self::InitError>; type Future = TimeoutFuture<Self::Transform, Self::InitError>;
fn new_transform(&self, service: S) -> Self::Future { fn new_transform(&self, service: S) -> Self::Future {
@ -157,7 +157,7 @@ where
fn call(&mut self, request: Req) -> Self::Future { fn call(&mut self, request: Req) -> Self::Future {
TimeoutServiceResponse { TimeoutServiceResponse {
fut: self.service.call(request), fut: self.service.call(request),
sleep: delay_for(self.timeout), sleep: sleep(self.timeout),
} }
} }
} }
@ -171,7 +171,8 @@ pin_project! {
{ {
#[pin] #[pin]
fut: S::Future, fut: S::Future,
sleep: Delay, #[pin]
sleep: Sleep,
} }
} }
@ -193,20 +194,18 @@ where
} }
// Now check the sleep // Now check the sleep
Pin::new(this.sleep) this.sleep.poll(cx).map(|_| Err(TimeoutError::Timeout))
.poll(cx)
.map(|_| Err(TimeoutError::Timeout))
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::task::Poll; use core::task::Poll;
use std::time::Duration; use core::time::Duration;
use super::*; use super::*;
use actix_service::{apply, fn_factory, Service, ServiceFactory}; use actix_service::{apply, fn_factory, Service, ServiceFactory};
use futures_util::future::{ok, FutureExt, LocalBoxFuture}; use futures_core::future::LocalBoxFuture;
struct SleepService(Duration); struct SleepService(Duration);
@ -218,9 +217,11 @@ mod tests {
actix_service::always_ready!(); actix_service::always_ready!();
fn call(&mut self, _: ()) -> Self::Future { fn call(&mut self, _: ()) -> Self::Future {
actix_rt::time::delay_for(self.0) let sleep = actix_rt::time::sleep(self.0);
.then(|_| ok::<_, ()>(())) Box::pin(async move {
.boxed_local() sleep.await;
Ok(())
})
} }
} }
@ -249,7 +250,7 @@ mod tests {
let timeout = apply( let timeout = apply(
Timeout::new(resolution), Timeout::new(resolution),
fn_factory(|| ok::<_, ()>(SleepService(wait_time))), fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }),
); );
let mut srv = timeout.new_service(&()).await.unwrap(); let mut srv = timeout.new_service(&()).await.unwrap();