From 5926035d6cbd543ef9831410566351a63f7a2324 Mon Sep 17 00:00:00 2001 From: alvardes Date: Thu, 25 Jun 2020 06:19:22 +0200 Subject: [PATCH] Update udp-echo example to actix 0.9 and tokio 0.2. (#336) --- Cargo.toml | 2 +- udp-echo/Cargo.toml | 11 +++++++---- udp-echo/src/main.rs | 41 ++++++++++++++++++++++++++++------------- 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cb65c0fa..6487165b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ members = [ "template_tera", "template_yarte", "todo", -# "udp-echo", + "udp-echo", "unix-socket", "web-cors/backend", "websocket", diff --git a/udp-echo/Cargo.toml b/udp-echo/Cargo.toml index 37da872e..380ec0e7 100644 --- a/udp-echo/Cargo.toml +++ b/udp-echo/Cargo.toml @@ -5,7 +5,10 @@ authors = ["Anton Patrushev "] edition = "2018" [dependencies] -actix = "0.7" -tokio = "0.1" -futures = "0.1" -bytes = "0.4" +actix = "0.9" +actix-rt = "1.1" +tokio = "0.2" +tokio-util = { version = "0.3", features = [ "codec", "udp" ] } +futures = "0.3" +futures-util = "0.3" +bytes = "0.5" diff --git a/udp-echo/src/main.rs b/udp-echo/src/main.rs index b926020b..819992de 100644 --- a/udp-echo/src/main.rs +++ b/udp-echo/src/main.rs @@ -1,42 +1,57 @@ +use actix::io::SinkWrite; use actix::{Actor, AsyncContext, Context, Message, StreamHandler}; +use bytes::Bytes; use bytes::BytesMut; use futures::stream::SplitSink; -use futures::{Future, Sink, Stream}; +use futures_util::stream::StreamExt; +use std::io::Result; use std::net::SocketAddr; -use tokio::codec::BytesCodec; -use tokio::net::{UdpFramed, UdpSocket}; +use tokio::net::UdpSocket; +use tokio_util::codec::BytesCodec; +use tokio_util::udp::UdpFramed; + +type SinkItem = (Bytes, SocketAddr); +type UdpSink = SplitSink, SinkItem>; struct UdpActor { - sink: SplitSink>, + sink: SinkWrite, } impl Actor for UdpActor { type Context = Context; } #[derive(Message)] +#[rtype(result = "()")] struct UdpPacket(BytesMut, SocketAddr); -impl StreamHandler for UdpActor { + +impl StreamHandler for UdpActor { fn handle(&mut self, msg: UdpPacket, _: &mut Context) { println!("Received: ({:?}, {:?})", msg.0, msg.1); - (&mut self.sink).send((msg.0.into(), msg.1)).wait().unwrap(); + self.sink.write((msg.0.into(), msg.1)).unwrap(); } } -fn main() { - let sys = actix::System::new("echo-udp"); +impl actix::io::WriteHandler for UdpActor {} +#[actix_rt::main] +async fn main() { let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - let sock = UdpSocket::bind(&addr).unwrap(); + let sock = UdpSocket::bind(&addr).await.unwrap(); println!( "Started udp server on: 127.0.0.1:{:?}", sock.local_addr().unwrap().port() ); - let (sink, stream) = UdpFramed::new(sock, BytesCodec::new()).split(); UdpActor::create(|ctx| { - ctx.add_stream(stream.map(|(data, sender)| UdpPacket(data, sender))); - UdpActor { sink: sink } + ctx.add_stream( + stream.filter_map( + |item: Result<(BytesMut, SocketAddr)>| async { + item.map(|(data, sender)| UdpPacket(data, sender)).ok() + }, + ), + ); + UdpActor { sink: SinkWrite::new(sink, ctx), } }); - std::process::exit(sys.run()); + actix_rt::Arbiter::local_join().await; }