//! `ClientSession` is an actor, it manages peer tcp connection and //! proxies commands from peer to `ChatServer`. use std::{ io, net, str::FromStr, time::{Duration, Instant}, }; use actix::{prelude::*, spawn}; use tokio::{ io::{split, WriteHalf}, net::{TcpListener, TcpStream}, }; use tokio_util::codec::FramedRead; use crate::{ codec::{ChatCodec, ChatRequest, ChatResponse}, server::{self, ChatServer}, }; /// Chat server sends this messages to session #[derive(Message)] #[rtype(result = "()")] pub struct Message(pub String); /// `ChatSession` actor is responsible for tcp peer communications. pub struct ChatSession { /// unique session id id: usize, /// this is address of chat server addr: Addr, /// Client must send ping at least once per 10 seconds, otherwise we drop /// connection. hb: Instant, /// joined room room: String, /// Framed wrapper framed: actix::io::FramedWrite, ChatCodec>, } impl Actor for ChatSession { /// For tcp communication we are going to use `FramedContext`. /// It is convenient wrapper around `Framed` object from `tokio_io` type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { // we'll start heartbeat process on session start. self.hb(ctx); // register self in chat server. `AsyncContext::wait` register // future within context, but context waits until this future resolves // before processing any other events. let addr = ctx.address(); self.addr .send(server::Connect { addr: addr.recipient(), }) .into_actor(self) .then(|res, act, ctx| { match res { Ok(res) => act.id = res, // something is wrong with chat server _ => ctx.stop(), } actix::fut::ready(()) }) .wait(ctx); } fn stopping(&mut self, _: &mut Self::Context) -> Running { // notify chat server self.addr.do_send(server::Disconnect { id: self.id }); Running::Stop } } impl actix::io::WriteHandler for ChatSession {} /// To use `Framed` we have to define Io type and Codec impl StreamHandler> for ChatSession { /// This is main event loop for client requests fn handle(&mut self, msg: Result, ctx: &mut Context) { match msg { Ok(ChatRequest::List) => { // Send ListRooms message to chat server and wait for response println!("List rooms"); self.addr .send(server::ListRooms) .into_actor(self) .then(|res, act, _| { match res { Ok(rooms) => { act.framed.write(ChatResponse::Rooms(rooms)); } _ => println!("Something is wrong"), } actix::fut::ready(()) }) .wait(ctx) // .wait(ctx) pauses all events in context, // so actor wont receive any new messages until it get list of rooms back } Ok(ChatRequest::Join(name)) => { println!("Join to room: {name}"); self.room = name.clone(); self.addr.do_send(server::Join { id: self.id, name: name.clone(), }); self.framed.write(ChatResponse::Joined(name)); } Ok(ChatRequest::Message(message)) => { // send message to chat server println!("Peer message: {message}"); self.addr.do_send(server::Message { id: self.id, msg: message, room: self.room.clone(), }) } // we update heartbeat time on ping from peer Ok(ChatRequest::Ping) => self.hb = Instant::now(), _ => ctx.stop(), } } } /// Handler for Message, chat server sends this message, we just send string to /// peer impl Handler for ChatSession { type Result = (); fn handle(&mut self, msg: Message, _: &mut Context) { // send message to peer self.framed.write(ChatResponse::Message(msg.0)); } } /// Helper methods impl ChatSession { pub fn new( addr: Addr, framed: actix::io::FramedWrite, ChatCodec>, ) -> ChatSession { ChatSession { id: 0, addr, hb: Instant::now(), room: "main".to_owned(), framed, } } /// helper method that sends ping to client every second. /// /// also this method check heartbeats from client fn hb(&self, ctx: &mut Context) { ctx.run_interval(Duration::new(1, 0), |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > Duration::new(10, 0) { // heartbeat timed out println!("Client heartbeat failed, disconnecting!"); // notify chat server act.addr.do_send(server::Disconnect { id: act.id }); // stop actor ctx.stop(); } act.framed.write(ChatResponse::Ping); // if we can not send message to sink, sink is closed (disconnected) }); } } /// Define TCP server that will accept incoming TCP connection and create /// chat actors. pub fn tcp_server(_s: &str, server: Addr) { // Create server listener let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); spawn(async move { let listener = TcpListener::bind(&addr).await.unwrap(); while let Ok((stream, _)) = listener.accept().await { let server = server.clone(); ChatSession::create(|ctx| { let (r, w) = split(stream); ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx); ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx)) }); } }); }