mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-13 18:58:23 +02:00
Compare commits
1 Commits
remove-mps
...
tls-v3.0.0
Author | SHA1 | Date | |
---|---|---|---|
|
06ddad0051 |
48
.github/workflows/ci.yml
vendored
48
.github/workflows/ci.yml
vendored
@@ -39,21 +39,21 @@ jobs:
|
|||||||
profile: minimal
|
profile: minimal
|
||||||
override: true
|
override: true
|
||||||
|
|
||||||
- name: Install MSYS2
|
# - name: Install MSYS2
|
||||||
if: matrix.target.triple == 'x86_64-pc-windows-gnu'
|
# if: matrix.target.triple == 'x86_64-pc-windows-gnu'
|
||||||
uses: msys2/setup-msys2@v2
|
# uses: msys2/setup-msys2@v2
|
||||||
- name: Install MinGW Packages
|
# - name: Install MinGW Packages
|
||||||
if: matrix.target.triple == 'x86_64-pc-windows-gnu'
|
# if: matrix.target.triple == 'x86_64-pc-windows-gnu'
|
||||||
run: |
|
# run: |
|
||||||
msys2 -c 'pacman -Sy --noconfirm pacman'
|
# msys2 -c 'pacman -Sy --noconfirm pacman'
|
||||||
msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
|
# msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
|
||||||
|
|
||||||
- name: Generate Cargo.lock
|
# - name: Generate Cargo.lock
|
||||||
uses: actions-rs/cargo@v1
|
# uses: actions-rs/cargo@v1
|
||||||
with:
|
# with:
|
||||||
command: generate-lockfile
|
# command: generate-lockfile
|
||||||
- name: Cache Dependencies
|
# - name: Cache Dependencies
|
||||||
uses: Swatinem/rust-cache@v1.2.0
|
# uses: Swatinem/rust-cache@v1.2.0
|
||||||
|
|
||||||
- name: Install cargo-hack
|
- name: Install cargo-hack
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
@@ -65,13 +65,27 @@ jobs:
|
|||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
command: hack
|
command: hack
|
||||||
args: --clean-per-run check --workspace --no-default-features --tests
|
args: check --workspace --no-default-features
|
||||||
|
|
||||||
- name: check full
|
- name: check minimal + tests
|
||||||
|
uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: hack
|
||||||
|
args: check --workspace --no-default-features --tests --examples
|
||||||
|
|
||||||
|
- name: check default
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
command: check
|
command: check
|
||||||
args: --workspace --bins --examples --tests
|
args: --workspace --tests --examples
|
||||||
|
|
||||||
|
- name: check full
|
||||||
|
# TODO: compile OpenSSL and run tests on MinGW
|
||||||
|
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
|
||||||
|
uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: check
|
||||||
|
args: --workspace --all-features --tests --examples
|
||||||
|
|
||||||
- name: tests
|
- name: tests
|
||||||
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
|
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
|
||||||
|
@@ -235,7 +235,7 @@ impl<T, U> Framed<T, U> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Flush write buffer to underlying I/O stream.
|
/// Flush write buffer to underlying I/O stream.
|
||||||
pub fn poll_flush<I>(
|
pub fn flush<I>(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Result<(), U::Error>>
|
) -> Poll<Result<(), U::Error>>
|
||||||
@@ -271,7 +271,7 @@ impl<T, U> Framed<T, U> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Flush write buffer and shutdown underlying I/O stream.
|
/// Flush write buffer and shutdown underlying I/O stream.
|
||||||
pub fn poll_close<I>(
|
pub fn close<I>(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Result<(), U::Error>>
|
) -> Poll<Result<(), U::Error>>
|
||||||
@@ -319,11 +319,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
self.poll_flush(cx)
|
self.flush(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
self.poll_close(cx)
|
self.close(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,6 +1,9 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
|
||||||
|
|
||||||
|
## 2.1.0 - 2021-02-24
|
||||||
* Add `ActixStream` extension trait to include readiness methods. [#276]
|
* Add `ActixStream` extension trait to include readiness methods. [#276]
|
||||||
* Re-export `tokio::net::TcpSocket` in `net` module [#282]
|
* Re-export `tokio::net::TcpSocket` in `net` module [#282]
|
||||||
|
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-rt"
|
name = "actix-rt"
|
||||||
version = "2.0.2"
|
version = "2.1.0"
|
||||||
authors = [
|
authors = [
|
||||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||||
"Rob Ede <robjtede@icloud.com>",
|
"Rob Ede <robjtede@icloud.com>",
|
||||||
@@ -8,7 +8,7 @@ authors = [
|
|||||||
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
|
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
|
||||||
keywords = ["async", "futures", "io", "runtime"]
|
keywords = ["async", "futures", "io", "runtime"]
|
||||||
homepage = "https://actix.rs"
|
homepage = "https://actix.rs"
|
||||||
repository = "https://github.com/actix/actix-net.git"
|
repository = "https://github.com/actix/actix-net"
|
||||||
documentation = "https://docs.rs/actix-rt"
|
documentation = "https://docs.rs/actix-rt"
|
||||||
categories = ["network-programming", "asynchronous"]
|
categories = ["network-programming", "asynchronous"]
|
||||||
license = "MIT OR Apache-2.0"
|
license = "MIT OR Apache-2.0"
|
||||||
|
@@ -2,4 +2,13 @@
|
|||||||
|
|
||||||
> Tokio-based single-threaded async runtime for the Actix ecosystem.
|
> Tokio-based single-threaded async runtime for the Actix ecosystem.
|
||||||
|
|
||||||
|
[](https://crates.io/crates/actix-rt)
|
||||||
|
[](https://docs.rs/actix-rt/2.1.0)
|
||||||
|
[](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
|
||||||
|

|
||||||
|
<br />
|
||||||
|
[](https://deps.rs/crate/actix-rt/2.1.0)
|
||||||
|

|
||||||
|
[](https://discord.gg/WghFtEH6Hb)
|
||||||
|
|
||||||
See crate documentation for more: https://docs.rs/actix-rt.
|
See crate documentation for more: https://docs.rs/actix-rt.
|
||||||
|
@@ -1,9 +1,12 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
|
||||||
|
|
||||||
|
## 3.0.0-beta.4 - 2021-02-24
|
||||||
* Rename `accept::openssl::{SslStream => TlsStream}`.
|
* Rename `accept::openssl::{SslStream => TlsStream}`.
|
||||||
* Add `connect::Connect::set_local_addr` to attach local `Ipaddr`. [#282]
|
* Add `connect::Connect::set_local_addr` to attach local `IpAddr`. [#282]
|
||||||
* `connector::TcpConnector` service would try to bind to local_addr of `IpAddr` when given [#282]
|
* `connector::TcpConnector` service will try to bind to local_addr of `IpAddr` when given. [#282]
|
||||||
|
|
||||||
[#282]: https://github.com/actix/actix-net/pull/282
|
[#282]: https://github.com/actix/actix-net/pull/282
|
||||||
|
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-tls"
|
name = "actix-tls"
|
||||||
version = "3.0.0-beta.3"
|
version = "3.0.0-beta.4"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "TLS acceptor and connector services for Actix ecosystem"
|
description = "TLS acceptor and connector services for Actix ecosystem"
|
||||||
keywords = ["network", "tls", "ssl", "async", "transport"]
|
keywords = ["network", "tls", "ssl", "async", "transport"]
|
||||||
@@ -41,7 +41,7 @@ uri = ["http"]
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-codec = "0.4.0-beta.1"
|
actix-codec = "0.4.0-beta.1"
|
||||||
actix-rt = { version = "2.0.0", default-features = false }
|
actix-rt = { version = "2.1.0", default-features = false }
|
||||||
actix-service = "2.0.0-beta.4"
|
actix-service = "2.0.0-beta.4"
|
||||||
actix-utils = "3.0.0-beta.2"
|
actix-utils = "3.0.0-beta.2"
|
||||||
|
|
||||||
@@ -69,7 +69,7 @@ features = ["vendored"]
|
|||||||
optional = true
|
optional = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.0.0"
|
actix-rt = "2.1.0"
|
||||||
actix-server = "2.0.0-beta.3"
|
actix-server = "2.0.0-beta.3"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
env_logger = "0.8"
|
env_logger = "0.8"
|
||||||
@@ -78,5 +78,5 @@ log = "0.4"
|
|||||||
trust-dns-resolver = "0.20.0"
|
trust-dns-resolver = "0.20.0"
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
name = "basic"
|
name = "tcp-rustls"
|
||||||
required-features = ["accept", "rustls"]
|
required-features = ["accept", "rustls"]
|
||||||
|
@@ -24,7 +24,6 @@ futures-core = { version = "0.3.7", default-features = false }
|
|||||||
futures-sink = { version = "0.3.7", default-features = false }
|
futures-sink = { version = "0.3.7", default-features = false }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
pin-project-lite = "0.2.0"
|
pin-project-lite = "0.2.0"
|
||||||
tokio = { version = "1", features = ["sync"] }
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.0.0"
|
actix-rt = "2.0.0"
|
||||||
|
@@ -1,20 +1,21 @@
|
|||||||
//! Framed dispatcher service and related utilities.
|
//! Framed dispatcher service and related utilities.
|
||||||
|
|
||||||
use core::{
|
#![allow(type_alias_bounds)]
|
||||||
fmt,
|
|
||||||
future::Future,
|
use core::future::Future;
|
||||||
mem,
|
use core::pin::Pin;
|
||||||
pin::Pin,
|
use core::task::{Context, Poll};
|
||||||
task::{Context, Poll},
|
use core::{fmt, mem};
|
||||||
};
|
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
||||||
use actix_service::{IntoService, Service};
|
use actix_service::{IntoService, Service};
|
||||||
|
use futures_core::stream::Stream;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
/// Framed transport errors.
|
use crate::mpsc;
|
||||||
|
|
||||||
|
/// Framed transport errors
|
||||||
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
|
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
|
||||||
Service(E),
|
Service(E),
|
||||||
Encoder(<U as Encoder<I>>::Error),
|
Encoder(<U as Encoder<I>>::Error),
|
||||||
@@ -63,7 +64,8 @@ pub enum Message<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pin_project! {
|
pin_project! {
|
||||||
/// Dispatcher is a future that reads frames from Framed object and passes them to the service.
|
/// Dispatcher is a future that reads frames from Framed object
|
||||||
|
/// and passes them to the service.
|
||||||
pub struct Dispatcher<S, T, U, I>
|
pub struct Dispatcher<S, T, U, I>
|
||||||
where
|
where
|
||||||
S: Service<<U as Decoder>::Item, Response = I>,
|
S: Service<<U as Decoder>::Item, Response = I>,
|
||||||
@@ -80,8 +82,8 @@ pin_project! {
|
|||||||
state: State<S, U, I>,
|
state: State<S, U, I>,
|
||||||
#[pin]
|
#[pin]
|
||||||
framed: Framed<T, U>,
|
framed: Framed<T, U>,
|
||||||
rx: mpsc::UnboundedReceiver<Result<Message<I>, S::Error>>,
|
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
|
||||||
tx: mpsc::UnboundedSender<Result<Message<I>, S::Error>>,
|
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,7 +134,26 @@ where
|
|||||||
where
|
where
|
||||||
F: IntoService<S, <U as Decoder>::Item>,
|
F: IntoService<S, <U as Decoder>::Item>,
|
||||||
{
|
{
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
|
Dispatcher {
|
||||||
|
framed,
|
||||||
|
rx,
|
||||||
|
tx,
|
||||||
|
service: service.into_service(),
|
||||||
|
state: State::Processing,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Construct new `Dispatcher` instance with customer `mpsc::Receiver`
|
||||||
|
pub fn with_rx<F>(
|
||||||
|
framed: Framed<T, U>,
|
||||||
|
service: F,
|
||||||
|
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
|
||||||
|
) -> Self
|
||||||
|
where
|
||||||
|
F: IntoService<S, <U as Decoder>::Item>,
|
||||||
|
{
|
||||||
|
let tx = rx.sender();
|
||||||
Dispatcher {
|
Dispatcher {
|
||||||
framed,
|
framed,
|
||||||
rx,
|
rx,
|
||||||
@@ -143,28 +164,28 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get sink
|
/// Get sink
|
||||||
pub fn tx(&self) -> mpsc::UnboundedSender<Result<Message<I>, S::Error>> {
|
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
|
||||||
self.tx.clone()
|
self.tx.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get reference to a service wrapped by `Dispatcher` instance.
|
/// Get reference to a service wrapped by `Dispatcher` instance.
|
||||||
pub fn service(&self) -> &S {
|
pub fn get_ref(&self) -> &S {
|
||||||
&self.service
|
&self.service
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get mutable reference to a service wrapped by `Dispatcher` instance.
|
/// Get mutable reference to a service wrapped by `Dispatcher` instance.
|
||||||
pub fn service_mut(&mut self) -> &mut S {
|
pub fn get_mut(&mut self) -> &mut S {
|
||||||
&mut self.service
|
&mut self.service
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get reference to a framed instance wrapped by `Dispatcher`
|
/// Get reference to a framed instance wrapped by `Dispatcher`
|
||||||
/// instance.
|
/// instance.
|
||||||
pub fn framed(&self) -> &Framed<T, U> {
|
pub fn get_framed(&self) -> &Framed<T, U> {
|
||||||
&self.framed
|
&self.framed
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get mutable reference to a framed instance wrapped by `Dispatcher` instance.
|
/// Get mutable reference to a framed instance wrapped by `Dispatcher` instance.
|
||||||
pub fn framed_mut(&mut self) -> &mut Framed<T, U> {
|
pub fn get_framed_mut(&mut self) -> &mut Framed<T, U> {
|
||||||
&mut self.framed
|
&mut self.framed
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -225,7 +246,7 @@ where
|
|||||||
loop {
|
loop {
|
||||||
let mut this = self.as_mut().project();
|
let mut this = self.as_mut().project();
|
||||||
while !this.framed.is_write_buf_full() {
|
while !this.framed.is_write_buf_full() {
|
||||||
match this.rx.poll_recv(cx) {
|
match Pin::new(&mut this.rx).poll_next(cx) {
|
||||||
Poll::Ready(Some(Ok(Message::Item(msg)))) => {
|
Poll::Ready(Some(Ok(Message::Item(msg)))) => {
|
||||||
if let Err(err) = this.framed.as_mut().write(msg) {
|
if let Err(err) = this.framed.as_mut().write(msg) {
|
||||||
*this.state = State::FramedError(DispatcherError::Encoder(err));
|
*this.state = State::FramedError(DispatcherError::Encoder(err));
|
||||||
@@ -245,7 +266,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !this.framed.is_write_buf_empty() {
|
if !this.framed.is_write_buf_empty() {
|
||||||
match this.framed.poll_flush(cx) {
|
match this.framed.flush(cx) {
|
||||||
Poll::Pending => break,
|
Poll::Pending => break,
|
||||||
Poll::Ready(Ok(_)) => (),
|
Poll::Ready(Ok(_)) => (),
|
||||||
Poll::Ready(Err(err)) => {
|
Poll::Ready(Err(err)) => {
|
||||||
@@ -277,43 +298,41 @@ where
|
|||||||
type Output = Result<(), DispatcherError<S::Error, U, I>>;
|
type Output = Result<(), DispatcherError<S::Error, U, I>>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.as_mut().project();
|
loop {
|
||||||
|
let this = self.as_mut().project();
|
||||||
|
|
||||||
match this.state {
|
return match this.state {
|
||||||
State::Processing => {
|
State::Processing => {
|
||||||
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
|
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
|
||||||
self.poll(cx)
|
continue;
|
||||||
} else {
|
} else {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
State::Error(_) => {
|
||||||
|
// flush write buffer
|
||||||
State::Error(_) => {
|
if !this.framed.is_write_buf_empty() && this.framed.flush(cx).is_pending() {
|
||||||
// flush write buffer
|
return Poll::Pending;
|
||||||
if !this.framed.is_write_buf_empty() && this.framed.poll_flush(cx).is_pending()
|
}
|
||||||
{
|
Poll::Ready(Err(this.state.take_error()))
|
||||||
return Poll::Pending;
|
|
||||||
}
|
}
|
||||||
|
State::FlushAndStop => {
|
||||||
Poll::Ready(Err(this.state.take_error()))
|
if !this.framed.is_write_buf_empty() {
|
||||||
}
|
match this.framed.flush(cx) {
|
||||||
|
Poll::Ready(Err(err)) => {
|
||||||
State::FlushAndStop => {
|
debug!("Error sending data: {:?}", err);
|
||||||
if !this.framed.is_write_buf_empty() {
|
Poll::Ready(Ok(()))
|
||||||
this.framed.poll_flush(cx).map(|res| {
|
}
|
||||||
if let Err(err) = res {
|
Poll::Pending => Poll::Pending,
|
||||||
debug!("Error sending data: {:?}", err);
|
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
Ok(())
|
Poll::Ready(Ok(()))
|
||||||
})
|
}
|
||||||
} else {
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
}
|
||||||
}
|
State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
|
||||||
|
State::Stopping => Poll::Ready(Ok(())),
|
||||||
State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
|
};
|
||||||
State::Stopping => Poll::Ready(Ok(())),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -7,5 +7,6 @@
|
|||||||
|
|
||||||
pub mod counter;
|
pub mod counter;
|
||||||
pub mod dispatcher;
|
pub mod dispatcher;
|
||||||
|
pub mod mpsc;
|
||||||
pub mod task;
|
pub mod task;
|
||||||
pub mod timeout;
|
pub mod timeout;
|
||||||
|
224
actix-utils/src/mpsc.rs
Normal file
224
actix-utils/src/mpsc.rs
Normal file
@@ -0,0 +1,224 @@
|
|||||||
|
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
|
||||||
|
|
||||||
|
use core::any::Any;
|
||||||
|
use core::cell::RefCell;
|
||||||
|
use core::fmt;
|
||||||
|
use core::pin::Pin;
|
||||||
|
use core::task::{Context, Poll};
|
||||||
|
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::error::Error;
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use futures_core::stream::Stream;
|
||||||
|
use futures_sink::Sink;
|
||||||
|
|
||||||
|
use crate::task::LocalWaker;
|
||||||
|
|
||||||
|
/// Creates a unbounded in-memory channel with buffered storage.
|
||||||
|
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
|
||||||
|
let shared = Rc::new(RefCell::new(Shared {
|
||||||
|
has_receiver: true,
|
||||||
|
buffer: VecDeque::new(),
|
||||||
|
blocked_recv: LocalWaker::new(),
|
||||||
|
}));
|
||||||
|
let sender = Sender {
|
||||||
|
shared: shared.clone(),
|
||||||
|
};
|
||||||
|
let receiver = Receiver { shared };
|
||||||
|
(sender, receiver)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Shared<T> {
|
||||||
|
buffer: VecDeque<T>,
|
||||||
|
blocked_recv: LocalWaker,
|
||||||
|
has_receiver: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The transmission end of a channel.
|
||||||
|
///
|
||||||
|
/// This is created by the `channel` function.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Sender<T> {
|
||||||
|
shared: Rc<RefCell<Shared<T>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Unpin for Sender<T> {}
|
||||||
|
|
||||||
|
impl<T> Sender<T> {
|
||||||
|
/// Sends the provided message along this channel.
|
||||||
|
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
|
||||||
|
let mut shared = self.shared.borrow_mut();
|
||||||
|
if !shared.has_receiver {
|
||||||
|
return Err(SendError(item)); // receiver was dropped
|
||||||
|
};
|
||||||
|
shared.buffer.push_back(item);
|
||||||
|
shared.blocked_recv.wake();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Closes the sender half
|
||||||
|
///
|
||||||
|
/// This prevents any further messages from being sent on the channel while
|
||||||
|
/// still enabling the receiver to drain messages that are buffered.
|
||||||
|
pub fn close(&mut self) {
|
||||||
|
self.shared.borrow_mut().has_receiver = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Clone for Sender<T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Sender {
|
||||||
|
shared: self.shared.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Sink<T> for Sender<T> {
|
||||||
|
type Error = SendError<T>;
|
||||||
|
|
||||||
|
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError<T>> {
|
||||||
|
self.send(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), SendError<T>>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Drop for Sender<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let count = Rc::strong_count(&self.shared);
|
||||||
|
let shared = self.shared.borrow_mut();
|
||||||
|
|
||||||
|
// check is last sender is about to drop
|
||||||
|
if shared.has_receiver && count == 2 {
|
||||||
|
// Wake up receiver as its stream has ended
|
||||||
|
shared.blocked_recv.wake();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The receiving end of a channel which implements the `Stream` trait.
|
||||||
|
///
|
||||||
|
/// This is created by the `channel` function.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Receiver<T> {
|
||||||
|
shared: Rc<RefCell<Shared<T>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Receiver<T> {
|
||||||
|
/// Create Sender
|
||||||
|
pub fn sender(&self) -> Sender<T> {
|
||||||
|
Sender {
|
||||||
|
shared: self.shared.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Unpin for Receiver<T> {}
|
||||||
|
|
||||||
|
impl<T> Stream for Receiver<T> {
|
||||||
|
type Item = T;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
let mut shared = self.shared.borrow_mut();
|
||||||
|
if Rc::strong_count(&self.shared) == 1 {
|
||||||
|
// All senders have been dropped, so drain the buffer and end the
|
||||||
|
// stream.
|
||||||
|
Poll::Ready(shared.buffer.pop_front())
|
||||||
|
} else if let Some(msg) = shared.buffer.pop_front() {
|
||||||
|
Poll::Ready(Some(msg))
|
||||||
|
} else {
|
||||||
|
shared.blocked_recv.register(cx.waker());
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Drop for Receiver<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let mut shared = self.shared.borrow_mut();
|
||||||
|
shared.buffer.clear();
|
||||||
|
shared.has_receiver = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Error type for sending, used when the receiving end of a channel is
|
||||||
|
/// dropped
|
||||||
|
pub struct SendError<T>(T);
|
||||||
|
|
||||||
|
impl<T> fmt::Debug for SendError<T> {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
fmt.debug_tuple("SendError").field(&"...").finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> fmt::Display for SendError<T> {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(fmt, "send failed because receiver is gone")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Any> Error for SendError<T> {
|
||||||
|
fn description(&self) -> &str {
|
||||||
|
"send failed because receiver is gone"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> SendError<T> {
|
||||||
|
/// Returns the message that was attempted to be sent but failed.
|
||||||
|
pub fn into_inner(self) -> T {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use futures_util::future::lazy;
|
||||||
|
use futures_util::{stream::Stream, StreamExt};
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_mpsc() {
|
||||||
|
let (tx, mut rx) = channel();
|
||||||
|
tx.send("test").unwrap();
|
||||||
|
assert_eq!(rx.next().await.unwrap(), "test");
|
||||||
|
|
||||||
|
let tx2 = tx.clone();
|
||||||
|
tx2.send("test2").unwrap();
|
||||||
|
assert_eq!(rx.next().await.unwrap(), "test2");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
|
||||||
|
Poll::Pending
|
||||||
|
);
|
||||||
|
drop(tx2);
|
||||||
|
assert_eq!(
|
||||||
|
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
|
||||||
|
Poll::Pending
|
||||||
|
);
|
||||||
|
drop(tx);
|
||||||
|
assert_eq!(rx.next().await, None);
|
||||||
|
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
tx.send("test").unwrap();
|
||||||
|
drop(rx);
|
||||||
|
assert!(tx.send("test").is_err());
|
||||||
|
|
||||||
|
let (mut tx, _) = channel();
|
||||||
|
let tx2 = tx.clone();
|
||||||
|
tx.close();
|
||||||
|
assert!(tx.send("test").is_err());
|
||||||
|
assert!(tx2.send("test").is_err());
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user