use actix_rt::Arbiter; use actix_web::error::ErrorInternalServerError; use actix_web::web::{Bytes, Data, Path}; use actix_web::{web, App, Error, HttpResponse, HttpServer, Responder}; use env_logger; use tokio::prelude::*; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::timer::Interval; use std::sync::Mutex; use std::time::{Duration, Instant}; fn main() { env_logger::init(); let data = Broadcaster::create(); HttpServer::new(move || { App::new() .register_data(data.clone()) .route("/", web::get().to(index)) .route("/events", web::get().to(new_client)) .route("/broadcast/{msg}", web::get().to(broadcast)) }) .bind("127.0.0.1:8080") .expect("Unable to bind port") .run() .unwrap(); } fn index() -> impl Responder { let content = include_str!("index.html"); HttpResponse::Ok() .header("content-type", "text/html") .body(content) } fn new_client(broadcaster: Data>) -> impl Responder { let rx = broadcaster.lock().unwrap().new_client(); HttpResponse::Ok() .header("content-type", "text/event-stream") .no_chunking() .streaming(rx) } fn broadcast(msg: Path, broadcaster: Data>) -> impl Responder { broadcaster.lock().unwrap().send(&msg.into_inner()); HttpResponse::Ok().body("msg sent") } struct Broadcaster { clients: Vec>, } impl Broadcaster { fn create() -> Data> { // Data ≃ Arc let me = Data::new(Mutex::new(Broadcaster::new())); // ping clients every 10 seconds to see if they are alive Broadcaster::spawn_ping(me.clone()); me } fn new() -> Self { Broadcaster { clients: Vec::new(), } } fn spawn_ping(me: Data>) { let task = Interval::new(Instant::now(), Duration::from_secs(10)) .for_each(move |_| { me.lock().unwrap().remove_stale_clients(); Ok(()) }) .map_err(|e| panic!("interval errored; err={:?}", e)); Arbiter::spawn(task); } fn remove_stale_clients(&mut self) { let mut ok_clients = Vec::new(); for client in self.clients.iter() { let result = client.clone().try_send(Bytes::from("data: ping\n\n")); if let Ok(()) = result { ok_clients.push(client.clone()); } } self.clients = ok_clients; } fn new_client(&mut self) -> Client { let (tx, rx) = channel(100); tx.clone() .try_send(Bytes::from("data: connected\n\n")) .unwrap(); self.clients.push(tx); Client(rx) } fn send(&self, msg: &str) { let msg = Bytes::from(["data: ", msg, "\n\n"].concat()); for client in self.clients.iter() { client.clone().try_send(msg.clone()).unwrap_or(()); } } } // wrap Receiver in own type, with correct error type struct Client(Receiver); impl Stream for Client { type Item = Bytes; type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { self.0.poll().map_err(ErrorInternalServerError) } }