1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-03 19:42:13 +01:00

Merge branch 'master' into rt-fix

This commit is contained in:
Rob Ede 2021-02-24 09:55:29 +00:00 committed by GitHub
commit 7c249e8834
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 284 additions and 277 deletions

103
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,103 @@
name: CI
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches: [master]
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
target:
- { name: Linux, os: ubuntu-latest, triple: x86_64-unknown-linux-gnu }
- { name: macOS, os: macos-latest, triple: x86_64-apple-darwin }
- { name: Windows, os: windows-latest, triple: x86_64-pc-windows-msvc }
- { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu }
- { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc }
version:
- 1.46.0 # MSRV
- stable
- nightly
name: ${{ matrix.target.name }} / ${{ matrix.version }}
runs-on: ${{ matrix.target.os }}
steps:
- name: Setup Routing
if: matrix.target.os == 'macos-latest'
run: sudo ifconfig lo0 alias 127.0.0.3
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-${{ matrix.target.triple }}
profile: minimal
override: true
- name: Install MSYS2
if: matrix.target.triple == 'x86_64-pc-windows-gnu'
uses: msys2/setup-msys2@v2
- name: Install MinGW Packages
if: matrix.target.triple == 'x86_64-pc-windows-gnu'
run: |
msys2 -c 'pacman -Sy --noconfirm pacman'
msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
with:
command: generate-lockfile
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1.2.0
- name: Install cargo-hack
uses: actions-rs/cargo@v1
with:
command: install
args: cargo-hack
- name: check minimal
uses: actions-rs/cargo@v1
with:
command: hack
args: --clean-per-run check --workspace --no-default-features --tests
- name: check full
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --bins --examples --tests
- name: tests
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --all-features --no-fail-fast -- --nocapture
- name: Generate coverage file
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --verbose
- name: Upload to Codecov
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1
with:
file: cobertura.xml
- name: Clear the cargo caches
run: |
cargo install cargo-cache --no-default-features --features ci-autoclean
cargo-cache

View File

@ -1,34 +1,42 @@
name: Lint
on: on:
pull_request: pull_request:
types: [opened, synchronize, reopened] types: [opened, synchronize, reopened]
name: Clippy and rustfmt Check
jobs: jobs:
clippy_check: fmt:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1 - name: Install Rust
uses: actions-rs/toolchain@v1
with: with:
toolchain: stable toolchain: stable
components: rustfmt
profile: minimal profile: minimal
components: rustfmt
override: true override: true
- name: Check with rustfmt - name: Rustfmt Check
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with:
command: fmt command: fmt
args: --all -- --check args: --all -- --check
- uses: actions-rs/toolchain@v1 clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with: with:
toolchain: nightly toolchain: stable
components: clippy
profile: minimal profile: minimal
components: clippy
override: true override: true
- name: Check with Clippy - name: Clippy Check
uses: actions-rs/clippy-check@v1 uses: actions-rs/clippy-check@v1
with: with:
token: ${{ secrets.GITHUB_TOKEN }} token: ${{ secrets.GITHUB_TOKEN }}
args: --workspace --tests args: --workspace --tests --all-features

View File

@ -1,82 +0,0 @@
name: CI (Linux)
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- 1.46.0
- stable
- nightly
name: ${{ matrix.version }} - x86_64-unknown-linux-gnu
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
with:
command: generate-lockfile
- name: Cache cargo dirs
uses: actions/cache@v2
with:
path:
~/.cargo/registry
~/.cargo/git
~/.cargo/bin
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: Cache cargo build
uses: actions/cache@v2
with:
path: target
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --bins --examples --tests
- name: tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
with:
command: test
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
- name: Generate coverage file
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --workspace
- name: Upload to Codecov
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
uses: codecov/codecov-action@v1
with:
file: cobertura.xml
- name: Clear the cargo caches
run: |
rustup update stable
rustup override set stable
cargo install cargo-cache --no-default-features --features ci-autoclean
cargo-cache

View File

@ -1,43 +0,0 @@
name: CI (macOS)
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- stable
- nightly
name: ${{ matrix.version }} - x86_64-apple-darwin
runs-on: macos-latest
steps:
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-apple-darwin
profile: minimal
override: true
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --bins --examples --tests
- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture

35
.github/workflows/upload-doc.yml vendored Normal file
View File

@ -0,0 +1,35 @@
name: Upload documentation
on:
push:
branches: [master]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Build Docs
uses: actions-rs/cargo@v1
with:
command: doc
args: --workspace --all-features --no-deps
- name: Tweak HTML
run: echo '<meta http-equiv="refresh" content="0;url=actix_server/index.html">' > target/doc/index.html
- name: Deploy to GitHub Pages
uses: JamesIves/github-pages-deploy-action@3.7.1
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BRANCH: gh-pages
FOLDER: target/doc

View File

@ -1,45 +0,0 @@
name: CI (Windows-mingw)
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- stable
- nightly
name: ${{ matrix.version }} - x86_64-pc-windows-gnu
runs-on: windows-latest
steps:
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-pc-windows-gnu
profile: minimal
override: true
- name: Install MSYS2
uses: msys2/setup-msys2@v2
- name: Install packages
run: |
msys2 -c 'pacman -Sy --noconfirm pacman'
msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --bins --examples --tests

View File

@ -1,69 +0,0 @@
name: CI (Windows)
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
env:
VCPKGRS_DYNAMIC: 1
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- stable
- nightly
target:
- x86_64-pc-windows-msvc
- i686-pc-windows-msvc
name: ${{ matrix.version }} - ${{ matrix.target }}
runs-on: windows-latest
steps:
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-${{ matrix.target }}
profile: minimal
override: true
- name: Install OpenSSL (x64)
if: matrix.target == 'x86_64-pc-windows-msvc'
run: |
vcpkg integrate install
vcpkg install openssl:x64-windows
Get-ChildItem C:\vcpkg\installed\x64-windows\bin
Get-ChildItem C:\vcpkg\installed\x64-windows\lib
Copy-Item C:\vcpkg\installed\x64-windows\bin\libcrypto-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libcrypto.dll
Copy-Item C:\vcpkg\installed\x64-windows\bin\libssl-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libssl.dll
- name: Install OpenSSL (x86)
if: matrix.target == 'i686-pc-windows-msvc'
run: |
vcpkg integrate install
vcpkg install openssl:x86-windows
Get-ChildItem C:\vcpkg\installed\x86-windows\bin
Get-ChildItem C:\vcpkg\installed\x86-windows\lib
Copy-Item C:\vcpkg\installed\x86-windows\bin\libcrypto-1_1.dll C:\vcpkg\installed\x86-windows\bin\libcrypto.dll
Copy-Item C:\vcpkg\installed\x86-windows\bin\libssl-1_1.dll C:\vcpkg\installed\x86-windows\bin\libssl.dll
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --bins --examples --tests
- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture

View File

@ -670,8 +670,6 @@ pub(crate) fn insert_slash(path: &str) -> String {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use http::Uri;
use std::convert::TryFrom;
#[test] #[test]
fn test_parse_static() { fn test_parse_static() {
@ -833,8 +831,11 @@ mod tests {
assert!(re.is_match("/user/2345/sdg")); assert!(re.is_match("/user/2345/sdg"));
} }
#[cfg(feature = "http")]
#[test] #[test]
fn test_parse_urlencoded_param() { fn test_parse_urlencoded_param() {
use std::convert::TryFrom;
let re = ResourceDef::new("/user/{id}/test"); let re = ResourceDef::new("/user/{id}/test");
let mut path = Path::new("/user/2345/test"); let mut path = Path::new("/user/2345/test");
@ -845,7 +846,7 @@ mod tests {
assert!(re.match_path(&mut path)); assert!(re.match_path(&mut path));
assert_eq!(path.get("id").unwrap(), "qwe%25"); assert_eq!(path.get("id").unwrap(), "qwe%25");
let uri = Uri::try_from("/user/qwe%25/test").unwrap(); let uri = http::Uri::try_from("/user/qwe%25/test").unwrap();
let mut path = Path::new(uri); let mut path = Path::new(uri);
assert!(re.match_path(&mut path)); assert!(re.match_path(&mut path));
assert_eq!(path.get("id").unwrap(), "qwe%25"); assert_eq!(path.get("id").unwrap(), "qwe%25");

View File

@ -2,8 +2,10 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Add `ActixStream` extension trait to include readiness methods. [#276] * Add `ActixStream` extension trait to include readiness methods. [#276]
* Re-export `tokio::net::TcpSocket` in `net` module [#282]
[#276]: https://github.com/actix/actix-net/pull/276 [#276]: https://github.com/actix/actix-net/pull/276
[#282]: https://github.com/actix/actix-net/pull/282
## 2.0.2 - 2021-02-06 ## 2.0.2 - 2021-02-06

View File

@ -79,7 +79,7 @@ pub mod net {
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
pub use tokio::net::UdpSocket; pub use tokio::net::UdpSocket;
pub use tokio::net::{TcpListener, TcpStream}; pub use tokio::net::{TcpListener, TcpSocket, TcpStream};
#[cfg(unix)] #[cfg(unix)]
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream}; pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};

View File

@ -12,7 +12,6 @@ repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-server" documentation = "https://docs.rs/actix-server"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config"]
edition = "2018" edition = "2018"
[lib] [lib]
@ -33,7 +32,7 @@ log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] } mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
slab = "0.4" slab = "0.4"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1.2", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0" actix-rt = "2.0.0"

View File

@ -8,7 +8,7 @@ use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerHandle; use crate::worker::WorkerHandle;
/// waker token for `mio::Poll` instance /// Waker token for `mio::Poll` instance.
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` /// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
@ -30,7 +30,7 @@ impl Deref for WakerQueue {
} }
impl WakerQueue { impl WakerQueue {
/// construct a waker queue with given `Poll`'s `Registry` and capacity. /// Construct a waker queue with given `Poll`'s `Registry` and capacity.
/// ///
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
/// event's token for it to properly handle `WakerInterest`. /// event's token for it to properly handle `WakerInterest`.
@ -41,7 +41,7 @@ impl WakerQueue {
Ok(Self(Arc::new((waker, queue)))) Ok(Self(Arc::new((waker, queue))))
} }
/// push a new interest to the queue and wake up the accept poll afterwards. /// Push a new interest to the queue and wake up the accept poll afterwards.
pub(crate) fn wake(&self, interest: WakerInterest) { pub(crate) fn wake(&self, interest: WakerInterest) {
let (waker, queue) = self.deref(); let (waker, queue) = self.deref();
@ -55,20 +55,20 @@ impl WakerQueue {
.unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e)); .unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
} }
/// get a MutexGuard of the waker queue. /// Get a MutexGuard of the waker queue.
pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> { pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
self.deref().1.lock().expect("Failed to lock WakerQueue") self.deref().1.lock().expect("Failed to lock WakerQueue")
} }
/// reset the waker queue so it does not grow infinitely. /// Reset the waker queue so it does not grow infinitely.
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) { pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue); std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
} }
} }
/// types of interests we would look into when `Accept`'s `Poll` is waked up by waker. /// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
/// ///
/// *. These interests should not be confused with `mio::Interest` and mostly not I/O related /// These interests should not be confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest { pub(crate) enum WakerInterest {
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
/// available and can accept new tasks. /// available and can accept new tasks.

View File

@ -102,8 +102,8 @@ pub trait Service<Req> {
/// call and the next invocation of `call` results in an error. /// call and the next invocation of `call` results in an error.
/// ///
/// # Notes /// # Notes
/// 1. `.poll_ready()` might be called on different task from actual service call. /// 1. `poll_ready` might be called on a different task to `call`.
/// 1. In case of chained services, `.poll_ready()` get called for all services at once. /// 1. In cases of chained services, `.poll_ready()` is called for all services at once.
fn poll_ready(&self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>; fn poll_ready(&self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Process the request and return the response asynchronously. /// Process the request and return the response asynchronously.

View File

@ -2,6 +2,10 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Rename `accept::openssl::{SslStream => TlsStream}`. * Rename `accept::openssl::{SslStream => TlsStream}`.
* Add `connect::Connect::set_local_addr` to attach local `Ipaddr`. [#282]
* `connector::TcpConnector` service would try to bind to local_addr of `IpAddr` when given [#282]
[#282]: https://github.com/actix/actix-net/pull/282
## 3.0.0-beta.3 - 2021-02-06 ## 3.0.0-beta.3 - 2021-02-06

View File

@ -52,7 +52,7 @@ log = "0.4"
tokio-util = { version = "0.6.3", default-features = false } tokio-util = { version = "0.6.3", default-features = false }
# openssl # openssl
tls-openssl = { package = "openssl", version = "0.10", optional = true } tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
tokio-openssl = { version = "0.6", optional = true } tokio-openssl = { version = "0.6", optional = true }
# rustls # rustls
@ -62,6 +62,12 @@ webpki-roots = { version = "0.21", optional = true }
# native-tls # native-tls
tokio-native-tls = { version = "0.3", optional = true } tokio-native-tls = { version = "0.3", optional = true }
[target.'cfg(windows)'.dependencies.tls-openssl]
version = "0.10.9"
package = "openssl"
features = ["vendored"]
optional = true
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0" actix-rt = "2.0.0"
actix-server = "2.0.0-beta.3" actix-server = "2.0.0-beta.3"

View File

@ -3,7 +3,7 @@ use std::{
fmt, fmt,
iter::{self, FromIterator as _}, iter::{self, FromIterator as _},
mem, mem,
net::SocketAddr, net::{IpAddr, SocketAddr},
}; };
/// Parse a host into parts (hostname and port). /// Parse a host into parts (hostname and port).
@ -67,6 +67,7 @@ pub struct Connect<T> {
pub(crate) req: T, pub(crate) req: T,
pub(crate) port: u16, pub(crate) port: u16,
pub(crate) addr: ConnectAddrs, pub(crate) addr: ConnectAddrs,
pub(crate) local_addr: Option<IpAddr>,
} }
impl<T: Address> Connect<T> { impl<T: Address> Connect<T> {
@ -78,6 +79,7 @@ impl<T: Address> Connect<T> {
req, req,
port: port.unwrap_or(0), port: port.unwrap_or(0),
addr: ConnectAddrs::None, addr: ConnectAddrs::None,
local_addr: None,
} }
} }
@ -88,6 +90,7 @@ impl<T: Address> Connect<T> {
req, req,
port: 0, port: 0,
addr: ConnectAddrs::One(addr), addr: ConnectAddrs::One(addr),
local_addr: None,
} }
} }
@ -119,6 +122,12 @@ impl<T: Address> Connect<T> {
self self
} }
/// Set local_addr of connect.
pub fn set_local_addr(mut self, addr: impl Into<IpAddr>) -> Self {
self.local_addr = Some(addr.into());
self
}
/// Get hostname. /// Get hostname.
pub fn hostname(&self) -> &str { pub fn hostname(&self) -> &str {
self.req.hostname() self.req.hostname()
@ -285,7 +294,7 @@ fn parse_host(host: &str) -> (&str, Option<u16>) {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::net::{IpAddr, Ipv4Addr}; use std::net::Ipv4Addr;
use super::*; use super::*;
@ -329,4 +338,13 @@ mod tests {
let mut iter = ConnectAddrsIter::None; let mut iter = ConnectAddrsIter::None;
assert_eq!(iter.next(), None); assert_eq!(iter.next(), None);
} }
#[test]
fn test_local_addr() {
let conn = Connect::new("hello").set_local_addr([127, 0, 0, 1]);
assert_eq!(
conn.local_addr.unwrap(),
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
)
}
} }

View File

@ -2,12 +2,12 @@ use std::{
collections::VecDeque, collections::VecDeque,
future::Future, future::Future,
io, io,
net::SocketAddr, net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use actix_rt::net::TcpStream; use actix_rt::net::{TcpSocket, TcpStream};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use log::{error, trace}; use log::{error, trace};
@ -54,9 +54,14 @@ impl<T: Address> Service<Connect<T>> for TcpConnector {
fn call(&self, req: Connect<T>) -> Self::Future { fn call(&self, req: Connect<T>) -> Self::Future {
let port = req.port(); let port = req.port();
let Connect { req, addr, .. } = req; let Connect {
req,
addr,
local_addr,
..
} = req;
TcpConnectorResponse::new(req, port, addr) TcpConnectorResponse::new(req, port, local_addr, addr)
} }
} }
@ -65,6 +70,7 @@ pub enum TcpConnectorResponse<T> {
Response { Response {
req: Option<T>, req: Option<T>,
port: u16, port: u16,
local_addr: Option<IpAddr>,
addrs: Option<VecDeque<SocketAddr>>, addrs: Option<VecDeque<SocketAddr>>,
stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>, stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>,
}, },
@ -72,7 +78,12 @@ pub enum TcpConnectorResponse<T> {
} }
impl<T: Address> TcpConnectorResponse<T> { impl<T: Address> TcpConnectorResponse<T> {
pub(crate) fn new(req: T, port: u16, addr: ConnectAddrs) -> TcpConnectorResponse<T> { pub(crate) fn new(
req: T,
port: u16,
local_addr: Option<IpAddr>,
addr: ConnectAddrs,
) -> TcpConnectorResponse<T> {
if addr.is_none() { if addr.is_none() {
error!("TCP connector: unresolved connection address"); error!("TCP connector: unresolved connection address");
return TcpConnectorResponse::Error(Some(ConnectError::Unresolved)); return TcpConnectorResponse::Error(Some(ConnectError::Unresolved));
@ -90,8 +101,9 @@ impl<T: Address> TcpConnectorResponse<T> {
ConnectAddrs::One(addr) => TcpConnectorResponse::Response { ConnectAddrs::One(addr) => TcpConnectorResponse::Response {
req: Some(req), req: Some(req),
port, port,
local_addr,
addrs: None, addrs: None,
stream: Some(ReusableBoxFuture::new(TcpStream::connect(addr))), stream: Some(ReusableBoxFuture::new(connect(addr, local_addr))),
}, },
// when resolver returns multiple socket addr for request they would be popped from // when resolver returns multiple socket addr for request they would be popped from
@ -99,6 +111,7 @@ impl<T: Address> TcpConnectorResponse<T> {
ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response { ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response {
req: Some(req), req: Some(req),
port, port,
local_addr,
addrs: Some(addrs), addrs: Some(addrs),
stream: None, stream: None,
}, },
@ -116,6 +129,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
TcpConnectorResponse::Response { TcpConnectorResponse::Response {
req, req,
port, port,
local_addr,
addrs, addrs,
stream, stream,
} => loop { } => loop {
@ -148,11 +162,38 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
// try to connect // try to connect
let addr = addrs.as_mut().unwrap().pop_front().unwrap(); let addr = addrs.as_mut().unwrap().pop_front().unwrap();
let fut = connect(addr, *local_addr);
match stream { match stream {
Some(rbf) => rbf.set(TcpStream::connect(addr)), Some(rbf) => rbf.set(fut),
None => *stream = Some(ReusableBoxFuture::new(TcpStream::connect(addr))), None => *stream = Some(ReusableBoxFuture::new(fut)),
} }
}, },
} }
} }
} }
async fn connect(addr: SocketAddr, local_addr: Option<IpAddr>) -> io::Result<TcpStream> {
// use local addr if connect asks for it.
match local_addr {
Some(ip_addr) => {
let socket = match ip_addr {
IpAddr::V4(ip_addr) => {
let socket = TcpSocket::new_v4()?;
let addr = SocketAddr::V4(SocketAddrV4::new(ip_addr, 0));
socket.bind(addr)?;
socket
}
IpAddr::V6(ip_addr) => {
let socket = TcpSocket::new_v6()?;
let addr = SocketAddr::V6(SocketAddrV6::new(ip_addr, 0, 0, 0));
socket.bind(addr)?;
socket
}
};
socket.connect(addr).await
}
None => TcpStream::connect(addr).await,
}
}

View File

@ -1,4 +1,9 @@
use std::io; #![cfg(feature = "connect")]
use std::{
io,
net::{IpAddr, Ipv4Addr},
};
use actix_codec::{BytesCodec, Framed}; use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
@ -9,7 +14,7 @@ use futures_util::sink::SinkExt;
use actix_tls::connect::{self as actix_connect, Connect}; use actix_tls::connect::{self as actix_connect, Connect};
#[cfg(all(feature = "connect", feature = "openssl"))] #[cfg(feature = "openssl")]
#[actix_rt::test] #[actix_rt::test]
async fn test_string() { async fn test_string() {
let srv = TestServer::with(|| { let srv = TestServer::with(|| {
@ -125,3 +130,25 @@ async fn test_rustls_uri() {
let con = conn.call(addr.into()).await.unwrap(); let con = conn.call(addr.into()).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
} }
#[actix_rt::test]
async fn test_local_addr() {
let srv = TestServer::with(|| {
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
})
});
let conn = actix_connect::default_connector();
let local = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3));
let (con, _) = conn
.call(Connect::with_addr("10", srv.addr()).set_local_addr(local))
.await
.unwrap()
.into_parts();
assert_eq!(con.local_addr().unwrap().ip(), local)
}

View File

@ -1,3 +1,5 @@
#![cfg(feature = "connect")]
use std::{ use std::{
io, io,
net::{Ipv4Addr, SocketAddr}, net::{Ipv4Addr, SocketAddr},