1
0
mirror of https://github.com/actix/examples synced 2025-06-28 18:00:37 +02:00

Restructure folders (#411)

This commit is contained in:
Daniel T. Rodrigues
2021-02-25 21:57:58 -03:00
committed by GitHub
parent 9db98162b2
commit c3407627d0
334 changed files with 127 additions and 120 deletions

View File

@ -0,0 +1,12 @@
[package]
name = "async_data_factory"
version = "0.1.0"
authors = ["fakeshadow <24548779@qq.com>"]
edition = "2018"
[dependencies]
actix-web = "3"
num_cpus = "1.13"
redis = { version = "0.16.0", default-features = false, features = ["tokio-rt-core"] }
# redis_tang is an redis pool for test purpose
redis_tang = "0.1"

View File

@ -0,0 +1,23 @@
## Usage:
This is an example on constructing async state with `App::data_factory`
## Reason:
`data_factory` would make sense in these situations:
- When async state not necessarily have to be shared between workers/threads.
- When async state would spawn tasks on `actix-rt`. If we centralized the state there could be a possibility the tasks get a very unbalanced distribution on the workers/threads
(`actix-rt` would spawn tasks on local thread whenever it's called)
## Requirement:
- `rustc 1.43 stable`
- `redis` server listen on `127.0.0.1:6379`(or use `REDIS_URL` env argument when starting the example)
## Endpoints:
- use a work load generator(e.g wrk) to benchmark the end points:
http://127.0.0.1:8080/pool prebuilt shared redis pool
http://127.0.0.1:8080/pool2 data_factory redis pool
## Context:
The real world difference can be vary by the work you are doing but in general it's a good idea to
spread your *identical* async tasks evenly between threads and have as little cross threads synchronization as possible.

View File

@ -0,0 +1,107 @@
use actix_web::web::Data;
use actix_web::{get, App, HttpServer};
use redis_tang::{Builder, Pool, RedisManager};
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1"));
let num_cpus = num_cpus::get();
// a shared redis pool for work load comparison.
let pool = pool_builder(num_cpus, redis_url.as_str())
.await
.expect("fail to build pool");
let pool = RedisWrapper(pool);
HttpServer::new(move || {
let redis_url = redis_url.clone();
App::new()
.data(pool.clone())
// a dummy data_factory implementation
.data_factory(|| {
/*
App::data_factory would accept a future as return type and poll the future when
App is initialized.
The Output of the future must be Result<T, E> and T will be the transformed to
App::Data<T> that can be extracted from handler/request.
(The E will only be used to trigger a log::error.)
This data is bound to worker thread and you get an instance of it for every
worker of the HttpServer.(hence the name data_factory)
*. It is NOT shared between workers
(unless the underlying data is a smart pointer like Arc<T>).
*/
async {
// 123usize would be transformed into Data<usize>
Ok::<usize, ()>(123)
}
})
// a data_factory redis pool for work load comparison.
.data_factory(move || pool_builder(1, redis_url.clone()))
.service(pool_shared_prebuilt)
.service(pool_local)
})
.bind("127.0.0.1:8080")?
.run()
.await
}
/*
This pool is shared between workers. We have all redis connections spawned tasks on main thread
therefore it puts too much pressure on one thread.
*. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async
redis connection type.
*/
#[get("/pool")]
async fn pool_shared_prebuilt(pool: Data<RedisWrapper>) -> &'static str {
ping(&pool.as_ref().0).await
}
/*
This pool is built with App::data_factory and we have 2 connections fixed for every worker.
It's evenly distributed and have no cross workers synchronization.
*/
#[get("/pool2")]
async fn pool_local(data: Data<usize>, pool: Data<Pool<RedisManager>>) -> &'static str {
assert_eq!(data.get_ref(), &123);
ping(pool.as_ref()).await
}
// boiler plate for redis pool
#[derive(Clone)]
struct RedisWrapper(Pool<RedisManager>);
async fn pool_builder(
num_cpus: usize,
redis_url: impl redis::IntoConnectionInfo,
) -> Result<Pool<RedisManager>, ()> {
let mgr = RedisManager::new(redis_url);
Builder::new()
.always_check(false)
.idle_timeout(None)
.max_lifetime(None)
.min_idle(num_cpus * 2)
.max_size(num_cpus * 2)
.build(mgr)
.await
.map_err(|_| ())
}
async fn ping(pool: &Pool<RedisManager>) -> &'static str {
let mut client = pool.get().await.unwrap().clone();
redis::cmd("PING")
.query_async::<_, ()>(&mut client)
.await
.unwrap();
"Done"
}

15
other/protobuf/Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "protobuf-example"
version = "0.2.0"
authors = ["kingxsp <jin.hb.zh@outlook.com>, Yuki Okushi <huyuumi.dev@gmail.com>"]
edition = "2018"
[dependencies]
bytes = "0.5.4"
env_logger = "0.7.1"
prost = "0.6.1"
prost-derive = "0.6.1"
actix = "0.10"
actix-protobuf = "0.6"
actix-web = "3"

28
other/protobuf/README.md Normal file
View File

@ -0,0 +1,28 @@
# protobuf
## Usage
### Server
```shell
# From workspace
cargo run --bin protobuf-example
# From ./protobuf
cargo run
```
### Client
```shell
# Dependencies
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.11.2/protobuf-python-3.11.2.zip
unzip protobuf-python-3.11.2.zip
cd protobuf-3.11.2/python/
python3 setup.py install
pip3 install --upgrade pip
pip3 install aiohttp
# Client
python3 client.py
```

68
other/protobuf/client.py Normal file
View File

@ -0,0 +1,68 @@
#!/usr/bin/env python3
# just start server and run client.py
# wget https://github.com/protocolbuffers/protobuf/releases/download/v3.11.2/protobuf-python-3.11.2.zip
# unzip protobuf-python-3.11.2.zip
# cd protobuf-3.11.2/python/
# python3 setup.py install
# pip3 install --upgrade pip
# pip3 install aiohttp
# python3 client.py
import test_pb2
import traceback
import sys
import asyncio
import aiohttp
def op():
try:
obj = test_pb2.MyObj()
obj.number = 9
obj.name = 'USB'
#Serialize
sendDataStr = obj.SerializeToString()
#print serialized string value
print('serialized string:', sendDataStr)
#------------------------#
# message transmission #
#------------------------#
receiveDataStr = sendDataStr
receiveData = test_pb2.MyObj()
#Deserialize
receiveData.ParseFromString(receiveDataStr)
print('pares serialize string, return: devId = ', receiveData.number, ', name = ', receiveData.name)
except(Exception, e):
print(Exception, ':', e)
print(traceback.print_exc())
errInfo = sys.exc_info()
print(errInfo[0], ':', errInfo[1])
async def fetch(session):
obj = test_pb2.MyObj()
obj.number = 9
obj.name = 'USB'
async with session.post('http://127.0.0.1:8081/', data=obj.SerializeToString(),
headers={"content-type": "application/protobuf"}) as resp:
print(resp.status)
data = await resp.read()
receiveObj = test_pb2.MyObj()
receiveObj.ParseFromString(data)
print(receiveObj)
async def go(loop):
obj = test_pb2.MyObj()
obj.number = 9
obj.name = 'USB'
async with aiohttp.ClientSession(loop=loop) as session:
await fetch(session)
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()

View File

@ -0,0 +1,35 @@
#[macro_use]
extern crate prost_derive;
use actix_protobuf::*;
use actix_web::*;
#[derive(Clone, PartialEq, Message)]
pub struct MyObj {
#[prost(int32, tag = "1")]
pub number: i32,
#[prost(string, tag = "2")]
pub name: String,
}
async fn index(msg: ProtoBuf<MyObj>) -> Result<HttpResponse> {
println!("model: {:?}", msg);
HttpResponse::Ok().protobuf(msg.0) // <- send response
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "actix_web=debug,actix_server=info");
env_logger::init();
HttpServer::new(|| {
App::new()
.wrap(middleware::Logger::default())
.service(web::resource("/").route(web::post().to(index)))
})
.bind("127.0.0.1:8081")?
.shutdown_timeout(1)
.run()
.await
}

View File

@ -0,0 +1,6 @@
syntax = "proto3";
message MyObj {
int32 number = 1;
string name = 2;
}

View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: test.proto
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='test.proto',
package='',
syntax='proto3',
serialized_options=None,
serialized_pb=b'\n\ntest.proto\"%\n\x05MyObj\x12\x0e\n\x06number\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\tb\x06proto3'
)
_MYOBJ = _descriptor.Descriptor(
name='MyObj',
full_name='MyObj',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='number', full_name='MyObj.number', index=0,
number=1, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='name', full_name='MyObj.name', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=14,
serialized_end=51,
)
DESCRIPTOR.message_types_by_name['MyObj'] = _MYOBJ
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
MyObj = _reflection.GeneratedProtocolMessageType('MyObj', (_message.Message,), {
'DESCRIPTOR' : _MYOBJ,
'__module__' : 'test_pb2'
# @@protoc_insertion_point(class_scope:MyObj)
})
_sym_db.RegisterMessage(MyObj)
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,11 @@
[package]
name = "server-sent-events"
version = "1.0.0"
authors = ["Arve Seljebu"]
edition = "2018"
[dependencies]
actix-web = "3"
env_logger = "0.8"
futures = "0.3.1"
tokio = { version = "0.2", features = ["sync"] }

View File

@ -0,0 +1,40 @@
# actix-sse
Example of server-sent events, aka `EventSource`, with actix web.
```sh
cargo run
```
Open http://localhost:8080/ with a browser, then send events with another HTTP client:
```sh
curl localhost:8080/broadcast/my_message
```
*my_message* should appear in the browser with a timestamp.
## Performance
This implementation serve thousand of clients on a 2013 macbook air without problems.
Run [benchmark.js](benchmark.js) to benchmark your own system:
```sh
$ node benchmark.js
Connected: 1000, connection time: 867 ms, total broadcast time: 23 ms^C⏎
```
### Error *Too many open files*
You may be limited to a maximal number of connections (open file descriptors). Setting maximum number of open file descriptors to 2048:
```sh
ulimit -n 2048
```
Test maximum number of open connections with [drain.js](drain.js):
```sh
$ node drain.js
Connections dropped: 5957, accepting connections: false^C⏎
```
_Accepting connections_ indicates wheter resources for the server have been exhausted.

View File

@ -0,0 +1,60 @@
const http = require('http')
const n = 100;
let connected = 0;
let messages = 0;
let start = Date.now();
let phase = 'connecting';
let connection_time;
let broadcast_time;
let message = process.argv[2] || 'msg';
let expected_data = "data: " + message;
for (let i = 0; i < n; i++) {
http.get({
host: 'localhost',
port: 8080,
path: '/events'
}, response => {
response.on('data', data => {
if (data.includes(expected_data)) {
messages += 1;
} else if (data.includes("data: connected\n")) {
connected += 1;
}
})
}).on('error', (_) => {});
}
setInterval(() => {
if (phase === 'connecting' && connected === n) {
// done connecting
phase = 'messaging';
connection_time = Date.now() - start;
}
if (phase === 'messaging') {
phase = 'waiting';
start = Date.now();
http.get({
host: 'localhost',
port: 8080,
path: '/broadcast/' + message
}, response => {
response.on('data', _ => {})
})
}
if (phase === 'waiting' && messages >= n) {
// all messages received
broadcast_time = Date.now() - start;
phase = 'paused';
messages = 0;
phase = 'messaging';
}
process.stdout.write("\r\x1b[K");
process.stdout.write(`Connected: ${connected}, connection time: ${connection_time} ms, total broadcast time: ${broadcast_time} ms`);
}, 20)

View File

@ -0,0 +1,36 @@
const http = require('http')
let drop_goal = 10_000;
let dropped = 0;
let query = {
host: 'localhost',
port: 8080,
path: '/events'
}
setInterval(() => {
if (dropped < drop_goal) {
let request = http.get(query, response => {
response.on('data', data => {
if (data.includes("data: connected\n")) {
// drop connection after welcome message
dropped += 1;
request.abort()
}
})
})
.on('error', () => {})
}
}, 1)
setInterval(() => {
http.get('http://localhost:8080/', () => print_status(true))
.setTimeout(100, () => print_status(false))
.on('error', () => {})
}, 20)
function print_status(accepting_connections) {
process.stdout.write("\r\x1b[K");
process.stdout.write(`Connections dropped: ${dropped}, accepting connections: ${accepting_connections}`);
}

View File

@ -0,0 +1,27 @@
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Server-sent events</title>
<style>
p {
margin-top: 0.5em;
margin-bottom: 0.5em;
}
</style>
</head>
<body>
<div id="root"></div>
<script>
let root = document.getElementById("root");
let events = new EventSource("/events");
events.onmessage = (event) => {
let data = document.createElement("p");
let time = new Date().toLocaleTimeString();
data.innerText = time + ": " + event.data;
root.appendChild(data);
}
</script>
</body>
</html>

View File

@ -0,0 +1,132 @@
use std::pin::Pin;
use std::sync::Mutex;
use std::task::{Context, Poll};
use std::time::Duration;
use actix_web::rt::time::{interval_at, Instant};
use actix_web::web::{Bytes, Data, Path};
use actix_web::{web, App, Error, HttpResponse, HttpServer, Responder};
use futures::{Stream, StreamExt};
use tokio::sync::mpsc::{channel, Receiver, Sender};
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init();
let data = Broadcaster::create();
HttpServer::new(move || {
App::new()
.app_data(data.clone())
.route("/", web::get().to(index))
.route("/events", web::get().to(new_client))
.route("/broadcast/{msg}", web::get().to(broadcast))
})
.bind("127.0.0.1:8080")?
.run()
.await
}
async fn index() -> impl Responder {
let content = include_str!("index.html");
HttpResponse::Ok()
.header("content-type", "text/html")
.body(content)
}
async fn new_client(broadcaster: Data<Mutex<Broadcaster>>) -> impl Responder {
let rx = broadcaster.lock().unwrap().new_client();
HttpResponse::Ok()
.header("content-type", "text/event-stream")
.streaming(rx)
}
async fn broadcast(
msg: Path<String>,
broadcaster: Data<Mutex<Broadcaster>>,
) -> impl Responder {
broadcaster.lock().unwrap().send(&msg.into_inner());
HttpResponse::Ok().body("msg sent")
}
struct Broadcaster {
clients: Vec<Sender<Bytes>>,
}
impl Broadcaster {
fn create() -> Data<Mutex<Self>> {
// Data ≃ Arc
let me = Data::new(Mutex::new(Broadcaster::new()));
// ping clients every 10 seconds to see if they are alive
Broadcaster::spawn_ping(me.clone());
me
}
fn new() -> Self {
Broadcaster {
clients: Vec::new(),
}
}
fn spawn_ping(me: Data<Mutex<Self>>) {
actix_web::rt::spawn(async move {
let mut task = interval_at(Instant::now(), Duration::from_secs(10));
while task.next().await.is_some() {
me.lock().unwrap().remove_stale_clients();
}
})
}
fn remove_stale_clients(&mut self) {
let mut ok_clients = Vec::new();
for client in self.clients.iter() {
let result = client.clone().try_send(Bytes::from("data: ping\n\n"));
if let Ok(()) = result {
ok_clients.push(client.clone());
}
}
self.clients = ok_clients;
}
fn new_client(&mut self) -> Client {
let (tx, rx) = channel(100);
tx.clone()
.try_send(Bytes::from("data: connected\n\n"))
.unwrap();
self.clients.push(tx);
Client(rx)
}
fn send(&self, msg: &str) {
let msg = Bytes::from(["data: ", msg, "\n\n"].concat());
for client in self.clients.iter() {
client.clone().try_send(msg.clone()).unwrap_or(());
}
}
}
// wrap Receiver in own type, with correct error type
struct Client(Receiver<Bytes>);
impl Stream for Client {
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

14
other/udp-echo/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "udp-echo"
version = "0.1.0"
authors = ["Anton Patrushev <apatrushev@gmail.com>"]
edition = "2018"
[dependencies]
actix = "0.10"
actix-rt = "1.1"
tokio = "0.2"
tokio-util = { version = "0.3", features = [ "codec", "udp" ] }
futures = "0.3"
futures-util = "0.3"
bytes = "0.5"

18
other/udp-echo/README.md Normal file
View File

@ -0,0 +1,18 @@
# udp-echo
## Usage
### server
```bash
cd examples/udp-echo
cargo run
# Started http server: 127.0.0.1:12345
```
### socat client
Copy port provided in server output and run following command to communicate
with the udp server:
```bash
socat - UDP4:localhost:12345
```

View File

@ -0,0 +1,57 @@
use actix::io::SinkWrite;
use actix::{Actor, AsyncContext, Context, Message, StreamHandler};
use bytes::Bytes;
use bytes::BytesMut;
use futures::stream::SplitSink;
use futures_util::stream::StreamExt;
use std::io::Result;
use std::net::SocketAddr;
use tokio::net::UdpSocket;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
type SinkItem = (Bytes, SocketAddr);
type UdpSink = SplitSink<UdpFramed<BytesCodec>, SinkItem>;
struct UdpActor {
sink: SinkWrite<SinkItem, UdpSink>,
}
impl Actor for UdpActor {
type Context = Context<Self>;
}
#[derive(Message)]
#[rtype(result = "()")]
struct UdpPacket(BytesMut, SocketAddr);
impl StreamHandler<UdpPacket> for UdpActor {
fn handle(&mut self, msg: UdpPacket, _: &mut Context<Self>) {
println!("Received: ({:?}, {:?})", msg.0, msg.1);
self.sink.write((msg.0.into(), msg.1)).unwrap();
}
}
impl actix::io::WriteHandler<std::io::Error> for UdpActor {}
#[actix_rt::main]
async fn main() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let sock = UdpSocket::bind(&addr).await.unwrap();
println!(
"Started udp server on: 127.0.0.1:{:?}",
sock.local_addr().unwrap().port()
);
let (sink, stream) = UdpFramed::new(sock, BytesCodec::new()).split();
UdpActor::create(|ctx| {
ctx.add_stream(stream.filter_map(
|item: Result<(BytesMut, SocketAddr)>| async {
item.map(|(data, sender)| UdpPacket(data, sender)).ok()
},
));
UdpActor {
sink: SinkWrite::new(sink, ctx),
}
});
actix_rt::Arbiter::local_join().await;
}

View File

@ -0,0 +1,9 @@
[package]
name = "unix-socket"
version = "1.0.0"
authors = ["Messense Lv <messense@icloud.com>"]
edition = "2018"
[dependencies]
env_logger = "0.8"
actix-web = "3"

View File

@ -0,0 +1,14 @@
## Unix domain socket example
```bash
$ curl --unix-socket /tmp/actix-uds.socket http://localhost/
Hello world!
```
Although this will only one thread for handling incoming connections
according to the
[documentation](https://actix.github.io/actix-web/actix_web/struct.HttpServer.html#method.bind_uds).
And it does not delete the socket file (`/tmp/actix-uds.socket`) when stopping
the server so it will fail to start next time you run it unless you delete
the socket file manually.

View File

@ -0,0 +1,32 @@
use actix_web::{middleware, web, App, HttpRequest, HttpServer};
async fn index(_req: HttpRequest) -> &'static str {
"Hello world!"
}
#[actix_web::main]
#[cfg(unix)]
async fn main() -> std::io::Result<()> {
::std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info");
env_logger::init();
HttpServer::new(|| {
App::new()
// enable logger - always register actix-web Logger middleware last
.wrap(middleware::Logger::default())
.service(
web::resource("/index.html")
.route(web::get().to(|| async { "Hello world!" })),
)
.service(web::resource("/").to(index))
})
.bind_uds("/tmp/actix-uds.socket")?
.run()
.await
}
#[cfg(not(unix))]
fn main() -> std::io::Result<()> {
println!("not supported");
Ok(())
}