1
0
mirror of https://github.com/actix/examples synced 2025-02-02 09:39:03 +01:00

Update udp-echo example to actix 0.9 and tokio 0.2. (#336)

This commit is contained in:
alvardes 2020-06-25 06:19:22 +02:00 committed by GitHub
parent b7ff0d1ffe
commit 5926035d6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 18 deletions

View File

@ -42,7 +42,7 @@ members = [
"template_tera", "template_tera",
"template_yarte", "template_yarte",
"todo", "todo",
# "udp-echo", "udp-echo",
"unix-socket", "unix-socket",
"web-cors/backend", "web-cors/backend",
"websocket", "websocket",

View File

@ -5,7 +5,10 @@ authors = ["Anton Patrushev <apatrushev@gmail.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
actix = "0.7" actix = "0.9"
tokio = "0.1" actix-rt = "1.1"
futures = "0.1" tokio = "0.2"
bytes = "0.4" tokio-util = { version = "0.3", features = [ "codec", "udp" ] }
futures = "0.3"
futures-util = "0.3"
bytes = "0.5"

View File

@ -1,42 +1,57 @@
use actix::io::SinkWrite;
use actix::{Actor, AsyncContext, Context, Message, StreamHandler}; use actix::{Actor, AsyncContext, Context, Message, StreamHandler};
use bytes::Bytes;
use bytes::BytesMut; use bytes::BytesMut;
use futures::stream::SplitSink; use futures::stream::SplitSink;
use futures::{Future, Sink, Stream}; use futures_util::stream::StreamExt;
use std::io::Result;
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::codec::BytesCodec; use tokio::net::UdpSocket;
use tokio::net::{UdpFramed, UdpSocket}; use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
type SinkItem = (Bytes, SocketAddr);
type UdpSink = SplitSink<UdpFramed<BytesCodec>, SinkItem>;
struct UdpActor { struct UdpActor {
sink: SplitSink<UdpFramed<BytesCodec>>, sink: SinkWrite<SinkItem, UdpSink>,
} }
impl Actor for UdpActor { impl Actor for UdpActor {
type Context = Context<Self>; type Context = Context<Self>;
} }
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
struct UdpPacket(BytesMut, SocketAddr); struct UdpPacket(BytesMut, SocketAddr);
impl StreamHandler<UdpPacket, std::io::Error> for UdpActor {
impl StreamHandler<UdpPacket> for UdpActor {
fn handle(&mut self, msg: UdpPacket, _: &mut Context<Self>) { fn handle(&mut self, msg: UdpPacket, _: &mut Context<Self>) {
println!("Received: ({:?}, {:?})", msg.0, msg.1); println!("Received: ({:?}, {:?})", msg.0, msg.1);
(&mut self.sink).send((msg.0.into(), msg.1)).wait().unwrap(); self.sink.write((msg.0.into(), msg.1)).unwrap();
} }
} }
fn main() { impl actix::io::WriteHandler<std::io::Error> for UdpActor {}
let sys = actix::System::new("echo-udp");
#[actix_rt::main]
async fn main() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let sock = UdpSocket::bind(&addr).unwrap(); let sock = UdpSocket::bind(&addr).await.unwrap();
println!( println!(
"Started udp server on: 127.0.0.1:{:?}", "Started udp server on: 127.0.0.1:{:?}",
sock.local_addr().unwrap().port() sock.local_addr().unwrap().port()
); );
let (sink, stream) = UdpFramed::new(sock, BytesCodec::new()).split(); let (sink, stream) = UdpFramed::new(sock, BytesCodec::new()).split();
UdpActor::create(|ctx| { UdpActor::create(|ctx| {
ctx.add_stream(stream.map(|(data, sender)| UdpPacket(data, sender))); ctx.add_stream(
UdpActor { sink: sink } stream.filter_map(
|item: Result<(BytesMut, SocketAddr)>| async {
item.map(|(data, sender)| UdpPacket(data, sender)).ok()
},
),
);
UdpActor { sink: SinkWrite::new(sink, ctx), }
}); });
std::process::exit(sys.run()); actix_rt::Arbiter::local_join().await;
} }