1
0
mirror of https://github.com/actix/examples synced 2024-11-27 16:02:57 +01:00

upgrade example to actix-web 0.7

This commit is contained in:
Nikolay Kim 2018-07-16 12:36:53 +06:00
parent 0b52c7850e
commit 2cc1b23761
57 changed files with 913 additions and 693 deletions

946
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,8 +4,10 @@ version = "0.1.0"
authors = ["Darin Gordon <dkcdkg@gmail.com>"] authors = ["Darin Gordon <dkcdkg@gmail.com>"]
[dependencies] [dependencies]
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
dotenv = "0.10" dotenv = "0.10"
env_logger = "0.5" env_logger = "0.5"
failure = "0.1.1" failure = "0.1.1"

View File

@ -1,31 +1,28 @@
use actix::prelude::*; use actix::prelude::*;
use std::{time::Duration, thread::sleep};
use failure::Error; use failure::Error;
use r2d2; use r2d2;
use r2d2_sqlite; use r2d2_sqlite;
use std::{thread::sleep, time::Duration};
pub type Pool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>; pub type Pool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
pub type Connection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>; pub type Connection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
pub struct DbExecutor(pub Pool); pub struct DbExecutor(pub Pool);
impl Actor for DbExecutor { impl Actor for DbExecutor {
type Context = SyncContext<Self>; type Context = SyncContext<Self>;
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum WeatherAgg { pub enum WeatherAgg {
AnnualAgg {year: i32, total: f64}, AnnualAgg { year: i32, total: f64 },
MonthAgg {year: i32, month: i32, total: f64} MonthAgg { year: i32, month: i32, total: f64 },
} }
pub enum Queries { pub enum Queries {
GetTopTenHottestYears, GetTopTenHottestYears,
GetTopTenColdestYears, GetTopTenColdestYears,
GetTopTenHottestMonths, GetTopTenHottestMonths,
GetTopTenColdestMonths GetTopTenColdestMonths,
} }
//pub struct GetTopTenHottestYears; //pub struct GetTopTenHottestYears;
@ -41,7 +38,7 @@ impl Handler<Queries> for DbExecutor {
match msg { match msg {
Queries::GetTopTenHottestYears => get_hottest_years(conn), Queries::GetTopTenHottestYears => get_hottest_years(conn),
Queries::GetTopTenColdestYears => get_coldest_years(conn), Queries::GetTopTenColdestYears => get_coldest_years(conn),
Queries::GetTopTenHottestMonths =>get_hottest_months(conn), Queries::GetTopTenHottestMonths => get_hottest_months(conn),
Queries::GetTopTenColdestMonths => get_coldest_months(conn), Queries::GetTopTenColdestMonths => get_coldest_months(conn),
} }
} }
@ -56,13 +53,17 @@ fn get_hottest_years(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
GROUP BY theyear GROUP BY theyear
ORDER BY total DESC LIMIT 10;"; ORDER BY total DESC LIMIT 10;";
let mut prep_stmt = conn.prepare(stmt)?; let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt.query_map(&[], |row| { let annuals = prep_stmt
WeatherAgg::AnnualAgg{year: row.get(0), .query_map(&[], |row| WeatherAgg::AnnualAgg {
total: row.get(1)}}) year: row.get(0),
.and_then(|mapped_rows| total: row.get(1),
Ok(mapped_rows.map(|row| row.unwrap()).collect::<Vec<WeatherAgg>>()))?; })
.and_then(|mapped_rows| {
Ok(mapped_rows
.map(|row| row.unwrap())
.collect::<Vec<WeatherAgg>>())
})?;
sleep(Duration::from_secs(2)); sleep(Duration::from_secs(2));
@ -79,11 +80,16 @@ fn get_coldest_years(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
ORDER BY total ASC LIMIT 10;"; ORDER BY total ASC LIMIT 10;";
let mut prep_stmt = conn.prepare(stmt)?; let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt.query_map(&[], |row| { let annuals = prep_stmt
WeatherAgg::AnnualAgg{year: row.get(0), .query_map(&[], |row| WeatherAgg::AnnualAgg {
total: row.get(1)}}) year: row.get(0),
.and_then(|mapped_rows| total: row.get(1),
Ok(mapped_rows.map(|row| row.unwrap()).collect::<Vec<WeatherAgg>>()))?; })
.and_then(|mapped_rows| {
Ok(mapped_rows
.map(|row| row.unwrap())
.collect::<Vec<WeatherAgg>>())
})?;
sleep(Duration::from_secs(2)); sleep(Duration::from_secs(2));
@ -91,8 +97,7 @@ fn get_coldest_years(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
} }
fn get_hottest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> { fn get_hottest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let stmt = let stmt = "SELECT cast(strftime('%Y', date) as int) as theyear,
"SELECT cast(strftime('%Y', date) as int) as theyear,
cast(strftime('%m', date) as int) as themonth, cast(strftime('%m', date) as int) as themonth,
sum(tmax) as total sum(tmax) as total
FROM nyc_weather FROM nyc_weather
@ -101,20 +106,24 @@ fn get_hottest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
ORDER BY total DESC LIMIT 10;"; ORDER BY total DESC LIMIT 10;";
let mut prep_stmt = conn.prepare(stmt)?; let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt.query_map(&[], |row| { let annuals = prep_stmt
WeatherAgg::MonthAgg{year: row.get(0), .query_map(&[], |row| WeatherAgg::MonthAgg {
month: row.get(1), year: row.get(0),
total: row.get(2)}}) month: row.get(1),
.and_then(|mapped_rows| total: row.get(2),
Ok(mapped_rows.map(|row| row.unwrap()).collect::<Vec<WeatherAgg>>()))?; })
.and_then(|mapped_rows| {
Ok(mapped_rows
.map(|row| row.unwrap())
.collect::<Vec<WeatherAgg>>())
})?;
sleep(Duration::from_secs(2)); sleep(Duration::from_secs(2));
Ok(annuals) Ok(annuals)
} }
fn get_coldest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> { fn get_coldest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let stmt = let stmt = "SELECT cast(strftime('%Y', date) as int) as theyear,
"SELECT cast(strftime('%Y', date) as int) as theyear,
cast(strftime('%m', date) as int) as themonth, cast(strftime('%m', date) as int) as themonth,
sum(tmax) as total sum(tmax) as total
FROM nyc_weather FROM nyc_weather
@ -123,12 +132,17 @@ fn get_coldest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
ORDER BY total ASC LIMIT 10;"; ORDER BY total ASC LIMIT 10;";
let mut prep_stmt = conn.prepare(stmt)?; let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt.query_map(&[], |row| { let annuals = prep_stmt
WeatherAgg::MonthAgg{year: row.get(0), .query_map(&[], |row| WeatherAgg::MonthAgg {
month: row.get(1), year: row.get(0),
total: row.get(2)}}) month: row.get(1),
.and_then(|mapped_rows| total: row.get(2),
Ok(mapped_rows.map(|row| row.unwrap()).collect::<Vec<WeatherAgg>>()))?; })
.and_then(|mapped_rows| {
Ok(mapped_rows
.map(|row| row.unwrap())
.collect::<Vec<WeatherAgg>>())
})?;
sleep(Duration::from_secs(2)); sleep(Duration::from_secs(2));
Ok(annuals) Ok(annuals)

View File

@ -10,7 +10,7 @@ This project illustrates two examples:
*/ */
#[macro_use] extern crate actix; extern crate actix;
extern crate actix_web; extern crate actix_web;
extern crate env_logger; extern crate env_logger;
extern crate failure; extern crate failure;
@ -19,51 +19,61 @@ extern crate num_cpus;
extern crate r2d2; extern crate r2d2;
extern crate r2d2_sqlite; extern crate r2d2_sqlite;
extern crate serde; extern crate serde;
#[macro_use] extern crate serde_derive; #[macro_use]
extern crate serde_derive;
extern crate serde_json; extern crate serde_json;
use actix::prelude::*; use actix::prelude::*;
use actix_web::{ use actix_web::{
http, middleware, server, App, AsyncResponder, FutureResponse, HttpResponse, http, middleware, server, App, AsyncResponder, Error as AWError, FutureResponse,
State, Error as AWError HttpResponse, State,
}; };
use std::error::Error as StdError; use futures::future::{join_all, ok as fut_ok, Future};
use failure::Error;
use futures::future::{Future, join_all, ok as fut_ok, err as fut_err};
use r2d2_sqlite::SqliteConnectionManager; use r2d2_sqlite::SqliteConnectionManager;
mod db; mod db;
use db::{DbExecutor, Queries, WeatherAgg, Pool}; use db::{DbExecutor, Pool, Queries, WeatherAgg};
/// State with DbExecutor address /// State with DbExecutor address
struct AppState { struct AppState {
db: Addr<Syn, DbExecutor>, db: Addr<DbExecutor>,
} }
/// Version 1: Calls 4 queries in sequential order, as an asynchronous handler /// Version 1: Calls 4 queries in sequential order, as an asynchronous handler
fn asyncio_weather(state: State<AppState>) -> FutureResponse<HttpResponse> { fn asyncio_weather(state: State<AppState>) -> FutureResponse<HttpResponse> {
let mut result: Vec<Vec<WeatherAgg>> = vec![]; let mut result: Vec<Vec<WeatherAgg>> = vec![];
state.db.send(Queries::GetTopTenHottestYears).from_err() state
.and_then(move |res| { .db
result.push(res.unwrap()); .send(Queries::GetTopTenHottestYears)
state.db.send(Queries::GetTopTenColdestYears).from_err() .from_err()
.and_then(move |res| { .and_then(move |res| {
result.push(res.unwrap()); result.push(res.unwrap());
state.db.send(Queries::GetTopTenHottestMonths).from_err() state
.and_then(move |res| { .db
result.push(res.unwrap()); .send(Queries::GetTopTenColdestYears)
state.db.send(Queries::GetTopTenColdestMonths).from_err() .from_err()
.and_then(move |res| { .and_then(move |res| {
result.push(res.unwrap()); result.push(res.unwrap());
fut_ok(result) state
}) .db
}) .send(Queries::GetTopTenHottestMonths)
.from_err()
.and_then(move |res| {
result.push(res.unwrap());
state
.db
.send(Queries::GetTopTenColdestMonths)
.from_err()
.and_then(move |res| {
result.push(res.unwrap());
fut_ok(result)
})
})
})
}) })
}) .and_then(|res| Ok(HttpResponse::Ok().json(res)))
.and_then(|res| Ok(HttpResponse::Ok().json(res))) .responder()
.responder()
} }
/// Version 2: Calls 4 queries in parallel, as an asynchronous handler /// Version 2: Calls 4 queries in parallel, as an asynchronous handler
@ -73,12 +83,13 @@ fn parallel_weather(state: State<AppState>) -> FutureResponse<HttpResponse> {
Box::new(state.db.send(Queries::GetTopTenHottestYears)), Box::new(state.db.send(Queries::GetTopTenHottestYears)),
Box::new(state.db.send(Queries::GetTopTenColdestYears)), Box::new(state.db.send(Queries::GetTopTenColdestYears)),
Box::new(state.db.send(Queries::GetTopTenHottestMonths)), Box::new(state.db.send(Queries::GetTopTenHottestMonths)),
Box::new(state.db.send(Queries::GetTopTenColdestMonths))]; Box::new(state.db.send(Queries::GetTopTenColdestMonths)),
];
join_all(fut_result) join_all(fut_result)
.map_err(AWError::from) .map_err(AWError::from)
.and_then(|result| { .and_then(|result| {
let res: Vec<Option<Vec<WeatherAgg>>> = let res: Vec<Option<Vec<WeatherAgg>>> =
result.into_iter().map(|x| x.ok()).collect(); result.into_iter().map(|x| x.ok()).collect();
Ok(HttpResponse::Ok().json(res)) Ok(HttpResponse::Ok().json(res))
@ -103,10 +114,10 @@ fn main() {
App::with_state(AppState{db: addr.clone()}) App::with_state(AppState{db: addr.clone()})
// enable logger // enable logger
.middleware(middleware::Logger::default()) .middleware(middleware::Logger::default())
.resource("/asyncio_weather", |r| .resource("/asyncio_weather", |r|
r.method(http::Method::GET) r.method(http::Method::GET)
.with(asyncio_weather)) .with(asyncio_weather))
.resource("/parallel_weather", |r| .resource("/parallel_weather", |r|
r.method(http::Method::GET) r.method(http::Method::GET)
.with(parallel_weather)) .with(parallel_weather))
}).bind("127.0.0.1:8080") }).bind("127.0.0.1:8080")

View File

@ -4,8 +4,10 @@ version = "0.1.0"
authors = ["dowwie <dkcdkg@gmail.com>"] authors = ["dowwie <dkcdkg@gmail.com>"]
[dependencies] [dependencies]
actix = "0.5.6" actix = "0.7"
actix-web = { version = "^0.6", features=["alpn"] } #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
futures = "0.1" futures = "0.1"
serde = "1.0.43" serde = "1.0.43"
serde_derive = "1.0.43" serde_derive = "1.0.43"

View File

@ -5,8 +5,10 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../" workspace = "../"
[dependencies] [dependencies]
actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
futures = "0.1" futures = "0.1"
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = "^0.6"
bytes = "0.4" bytes = "0.4"

View File

@ -21,21 +21,14 @@ use futures::future::{result, FutureResult};
use std::{env, io}; use std::{env, io};
/// favicon handler /// favicon handler
fn favicon(req: HttpRequest) -> Result<fs::NamedFile> { fn favicon(req: &HttpRequest) -> Result<fs::NamedFile> {
Ok(fs::NamedFile::open("static/favicon.ico")?) Ok(fs::NamedFile::open("static/favicon.ico")?)
} }
/// simple index handler /// simple index handler
fn welcome(mut req: HttpRequest) -> Result<HttpResponse> { fn welcome(req: &HttpRequest) -> Result<HttpResponse> {
println!("{:?}", req); println!("{:?}", req);
// example of ...
if let Ok(ch) = req.poll() {
if let futures::Async::Ready(Some(d)) = ch {
println!("{}", String::from_utf8_lossy(d.as_ref()));
}
}
// session // session
let mut counter = 1; let mut counter = 1;
if let Some(count) = req.session().get::<i32>("counter")? { if let Some(count) = req.session().get::<i32>("counter")? {
@ -53,12 +46,12 @@ fn welcome(mut req: HttpRequest) -> Result<HttpResponse> {
} }
/// 404 handler /// 404 handler
fn p404(req: HttpRequest) -> Result<fs::NamedFile> { fn p404(req: &HttpRequest) -> Result<fs::NamedFile> {
Ok(fs::NamedFile::open("static/404.html")?.set_status_code(StatusCode::NOT_FOUND)) Ok(fs::NamedFile::open("static/404.html")?.set_status_code(StatusCode::NOT_FOUND))
} }
/// async handler /// async handler
fn index_async(req: HttpRequest) -> FutureResult<HttpResponse, Error> { fn index_async(req: &HttpRequest) -> FutureResult<HttpResponse, Error> {
println!("{:?}", req); println!("{:?}", req);
result(Ok(HttpResponse::Ok().content_type("text/html").body( result(Ok(HttpResponse::Ok().content_type("text/html").body(
@ -78,7 +71,7 @@ fn index_async_body(path: Path<String>) -> HttpResponse {
} }
/// handler with path parameters like `/user/{name}/` /// handler with path parameters like `/user/{name}/`
fn with_param(req: HttpRequest) -> HttpResponse { fn with_param(req: &HttpRequest) -> HttpResponse {
println!("{:?}", req); println!("{:?}", req);
HttpResponse::Ok() HttpResponse::Ok()
@ -122,7 +115,7 @@ fn main() {
io::Error::new(io::ErrorKind::Other, "test"), StatusCode::INTERNAL_SERVER_ERROR) io::Error::new(io::ErrorKind::Other, "test"), StatusCode::INTERNAL_SERVER_ERROR)
})) }))
// static files // static files
.handler("/static", fs::StaticFiles::new("static")) .handler("/static", fs::StaticFiles::new("static").unwrap())
// redirect // redirect
.resource("/", |r| r.method(Method::GET).f(|req| { .resource("/", |r| r.method(Method::GET).f(|req| {
println!("{:?}", req); println!("{:?}", req);

View File

@ -5,8 +5,10 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../" workspace = "../"
[dependencies] [dependencies]
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
cookie = { version="0.10", features=["percent-encode", "secure"] } cookie = { version="0.10", features=["percent-encode", "secure"] }
futures = "0.1" futures = "0.1"
time = "0.1" time = "0.1"

View File

@ -14,31 +14,31 @@ use actix_web::{Error, HttpRequest, HttpResponse, Result};
pub trait RequestIdentity { pub trait RequestIdentity {
/// Return the claimed identity of the user associated request or /// Return the claimed identity of the user associated request or
/// ``None`` if no identity can be found associated with the request. /// ``None`` if no identity can be found associated with the request.
fn identity(&mut self) -> Option<&str>; fn identity(&self) -> Option<String>;
/// Remember identity. /// Remember identity.
fn remember(&mut self, identity: String); fn remember(&self, identity: String);
/// This method is used to 'forget' the current identity on subsequent /// This method is used to 'forget' the current identity on subsequent
/// requests. /// requests.
fn forget(&mut self); fn forget(&self);
} }
impl<S> RequestIdentity for HttpRequest<S> { impl<S> RequestIdentity for HttpRequest<S> {
fn identity(&mut self) -> Option<&str> { fn identity(&self) -> Option<String> {
if let Some(id) = self.extensions().get::<IdentityBox>() { if let Some(id) = self.extensions().get::<IdentityBox>() {
return id.0.identity(); return id.0.identity().map(|s| s.to_owned());
} }
None None
} }
fn remember(&mut self, identity: String) { fn remember(&self, identity: String) {
if let Some(id) = self.extensions_mut().get_mut::<IdentityBox>() { if let Some(id) = self.extensions_mut().get_mut::<IdentityBox>() {
return id.0.remember(identity); return id.0.as_mut().remember(identity);
} }
} }
fn forget(&mut self) { fn forget(&self) {
if let Some(id) = self.extensions_mut().get_mut::<IdentityBox>() { if let Some(id) = self.extensions_mut().get_mut::<IdentityBox>() {
return id.0.forget(); return id.0.forget();
} }
@ -86,7 +86,7 @@ unsafe impl Send for IdentityBox {}
unsafe impl Sync for IdentityBox {} unsafe impl Sync for IdentityBox {}
impl<S: 'static, T: IdentityPolicy<S>> Middleware<S> for IdentityService<T> { impl<S: 'static, T: IdentityPolicy<S>> Middleware<S> for IdentityService<T> {
fn start(&self, req: &mut HttpRequest<S>) -> Result<Started> { fn start(&self, req: &HttpRequest<S>) -> Result<Started> {
let mut req = req.clone(); let mut req = req.clone();
let fut = self let fut = self
@ -102,9 +102,7 @@ impl<S: 'static, T: IdentityPolicy<S>> Middleware<S> for IdentityService<T> {
Ok(Started::Future(Box::new(fut))) Ok(Started::Future(Box::new(fut)))
} }
fn response( fn response(&self, req: &HttpRequest<S>, resp: HttpResponse) -> Result<Response> {
&self, req: &mut HttpRequest<S>, resp: HttpResponse,
) -> Result<Response> {
if let Some(mut id) = req.extensions_mut().remove::<IdentityBox>() { if let Some(mut id) = req.extensions_mut().remove::<IdentityBox>() {
id.0.write(resp) id.0.write(resp)
} else { } else {
@ -200,7 +198,7 @@ impl CookieIdentityInner {
fn load<S>(&self, req: &mut HttpRequest<S>) -> Option<String> { fn load<S>(&self, req: &mut HttpRequest<S>) -> Option<String> {
if let Ok(cookies) = req.cookies() { if let Ok(cookies) = req.cookies() {
for cookie in cookies { for cookie in cookies.iter() {
if cookie.name() == self.name { if cookie.name() == self.name {
let mut jar = CookieJar::new(); let mut jar = CookieJar::new();
jar.add_original(cookie.clone()); jar.add_original(cookie.clone());

View File

@ -10,16 +10,16 @@ use actix_web::{middleware, server, App, HttpRequest, HttpResponse};
mod auth; mod auth;
use auth::{CookieIdentityPolicy, IdentityService, RequestIdentity}; use auth::{CookieIdentityPolicy, IdentityService, RequestIdentity};
fn index(mut req: HttpRequest) -> String { fn index(req: &HttpRequest) -> String {
format!("Hello {}", req.identity().unwrap_or("Anonymous")) format!("Hello {}", req.identity().unwrap_or("Anonymous".to_owned()))
} }
fn login(mut req: HttpRequest) -> HttpResponse { fn login(req: &HttpRequest) -> HttpResponse {
req.remember("user1".to_owned()); req.remember("user1".to_owned());
HttpResponse::Found().header("location", "/").finish() HttpResponse::Found().header("location", "/").finish()
} }
fn logout(mut req: HttpRequest) -> HttpResponse { fn logout(req: &HttpRequest) -> HttpResponse {
req.forget(); req.forget();
HttpResponse::Found().header("location", "/").finish() HttpResponse::Found().header("location", "/").finish()
} }

View File

@ -5,8 +5,10 @@ authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../" workspace = "../"
[dependencies] [dependencies]
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
futures = "0.1" futures = "0.1"
time = "0.1" time = "0.1"
env_logger = "0.5" env_logger = "0.5"

View File

@ -15,7 +15,7 @@ use actix_web::{middleware, server, App, HttpRequest, Result};
use std::env; use std::env;
/// simple index handler with session /// simple index handler with session
fn index(req: HttpRequest) -> Result<&'static str> { fn index(req: &HttpRequest) -> Result<&'static str> {
println!("{:?}", req); println!("{:?}", req);
// RequestSession trait is used for session access // RequestSession trait is used for session access

View File

@ -6,8 +6,10 @@ workspace = "../"
[dependencies] [dependencies]
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = "^0.6" actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
futures = "0.1" futures = "0.1"
uuid = { version = "0.5", features = ["serde", "v4"] } uuid = { version = "0.5", features = ["serde", "v4"] }

View File

@ -35,7 +35,7 @@ use db::{CreateUser, DbExecutor};
/// State with DbExecutor address /// State with DbExecutor address
struct AppState { struct AppState {
db: Addr<Syn, DbExecutor>, db: Addr<DbExecutor>,
} }
/// Async request handler /// Async request handler

View File

@ -4,7 +4,9 @@ version = "0.1.0"
authors = ["Gorm Casper <gcasper@gmail.com>"] authors = ["Gorm Casper <gcasper@gmail.com>"]
[dependencies] [dependencies]
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"

View File

@ -6,5 +6,7 @@ workspace = "../"
[dependencies] [dependencies]
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = "^0.6" actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -4,7 +4,7 @@ extern crate env_logger;
use actix_web::{middleware, server, App, HttpRequest}; use actix_web::{middleware, server, App, HttpRequest};
fn index(_req: HttpRequest) -> &'static str { fn index(_req: &HttpRequest) -> &'static str {
"Hello world!" "Hello world!"
} }

View File

@ -15,5 +15,7 @@ path = "src/server.rs"
[dependencies] [dependencies]
env_logger = "0.5" env_logger = "0.5"
futures = "0.1" futures = "0.1"
actix = "0.5"
actix-web = { version = "^0.6", features=["alpn"] } actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -10,7 +10,7 @@ use actix_web::{
use futures::{Future, Stream}; use futures::{Future, Stream};
/// Stream client request response and then send body to a server response /// Stream client request response and then send body to a server response
fn index(_req: HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> { fn index(_req: &HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> {
client::ClientRequest::get("http://127.0.0.1:8081/") client::ClientRequest::get("http://127.0.0.1:8081/")
.finish().unwrap() .finish().unwrap()
.send() .send()
@ -25,7 +25,7 @@ fn index(_req: HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> {
} }
/// streaming client request to a streaming server response /// streaming client request to a streaming server response
fn streaming(_req: HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> { fn streaming(_req: &HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> {
// send client request // send client request
client::ClientRequest::get("https://www.rust-lang.org/en-US/") client::ClientRequest::get("https://www.rust-lang.org/en-US/")
.finish().unwrap() .finish().unwrap()
@ -35,7 +35,7 @@ fn streaming(_req: HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
// read one chunk from client response and send this chunk to a server response // read one chunk from client response and send this chunk to a server response
// .from_err() converts PayloadError to an Error // .from_err() converts PayloadError to an Error
.body(Body::Streaming(Box::new(resp.from_err())))) .body(Body::Streaming(Box::new(resp.payload().from_err()))))
}) })
.responder() .responder()
} }

View File

@ -6,7 +6,7 @@ extern crate futures;
use actix_web::*; use actix_web::*;
use futures::Future; use futures::Future;
fn index(req: HttpRequest) -> FutureResponse<HttpResponse> { fn index(req: &HttpRequest) -> FutureResponse<HttpResponse> {
req.body() req.body()
.from_err() .from_err()
.map(|bytes| HttpResponse::Ok().body(bytes)) .map(|bytes| HttpResponse::Ok().body(bytes))

View File

@ -14,5 +14,6 @@ serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
json = "*" json = "*"
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -25,7 +25,7 @@ struct MyObj {
} }
/// This handler uses `HttpRequest::json()` for loading json object. /// This handler uses `HttpRequest::json()` for loading json object.
fn index(req: HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> { fn index(req: &HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> {
req.json() req.json()
.from_err() // convert all errors into `Error` .from_err() // convert all errors into `Error`
.and_then(|val: MyObj| { .and_then(|val: MyObj| {
@ -50,9 +50,9 @@ fn extract_item_limit((item, _req): (Json<MyObj>, HttpRequest)) -> HttpResponse
const MAX_SIZE: usize = 262_144; // max payload size is 256k const MAX_SIZE: usize = 262_144; // max payload size is 256k
/// This handler manually load request payload and parse json object /// This handler manually load request payload and parse json object
fn index_manual(req: HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> { fn index_manual(req: &HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> {
// HttpRequest is stream of Bytes objects // HttpRequest::payload() is stream of Bytes objects
req req.payload()
// `Future::from_err` acts like `?` in that it coerces the error type from // `Future::from_err` acts like `?` in that it coerces the error type from
// the future into the final error type // the future into the final error type
.from_err() .from_err()
@ -79,8 +79,11 @@ fn index_manual(req: HttpRequest) -> Box<Future<Item = HttpResponse, Error = Err
} }
/// This handler manually load request payload and parse json-rust /// This handler manually load request payload and parse json-rust
fn index_mjsonrust(req: HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> { fn index_mjsonrust(
req.concat2() req: &HttpRequest,
) -> Box<Future<Item = HttpResponse, Error = Error>> {
req.payload()
.concat2()
.from_err() .from_err()
.and_then(|body| { .and_then(|body| {
// body is loaded, now we can deserialize json-rust // body is loaded, now we can deserialize json-rust
@ -107,13 +110,15 @@ fn main() {
.middleware(middleware::Logger::default()) .middleware(middleware::Logger::default())
.resource("/extractor", |r| { .resource("/extractor", |r| {
r.method(http::Method::POST) r.method(http::Method::POST)
.with(extract_item) .with_config(extract_item, |cfg| {
.limit(4096); // <- limit size of the payload cfg.limit(4096); // <- limit size of the payload
})
}) })
.resource("/extractor2", |r| { .resource("/extractor2", |r| {
r.method(http::Method::POST) r.method(http::Method::POST)
.with(extract_item_limit) .with_config(extract_item_limit, |cfg| {
.0.limit(4096); // <- limit size of the payload cfg.0.limit(4096); // <- limit size of the payload
})
}) })
.resource("/manual", |r| r.method(http::Method::POST).f(index_manual)) .resource("/manual", |r| r.method(http::Method::POST).f(index_manual))
.resource("/mjsonrust", |r| r.method(http::Method::POST).f(index_mjsonrust)) .resource("/mjsonrust", |r| r.method(http::Method::POST).f(index_mjsonrust))

View File

@ -6,8 +6,10 @@ workspace = "../"
[dependencies] [dependencies]
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = "^0.6" actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
futures = "0.1" futures = "0.1"
serde = "1.0" serde = "1.0"

View File

@ -27,7 +27,7 @@ use schema::create_schema;
use schema::Schema; use schema::Schema;
struct AppState { struct AppState {
executor: Addr<Syn, GraphQLExecutor>, executor: Addr<GraphQLExecutor>,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -61,7 +61,7 @@ impl Handler<GraphQLData> for GraphQLExecutor {
} }
} }
fn graphiql(_req: HttpRequest<AppState>) -> Result<HttpResponse, Error> { fn graphiql(_req: &HttpRequest<AppState>) -> Result<HttpResponse, Error> {
let html = graphiql_source("http://127.0.0.1:8080/graphql"); let html = graphiql_source("http://127.0.0.1:8080/graphql");
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.content_type("text/html; charset=utf-8") .content_type("text/html; charset=utf-8")

View File

@ -4,5 +4,6 @@ version = "0.1.0"
authors = ["Gorm Casper <gcasper@gmail.com>"] authors = ["Gorm Casper <gcasper@gmail.com>"]
[dependencies] [dependencies]
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -7,7 +7,7 @@ pub struct CheckLogin;
impl<S> Middleware<S> for CheckLogin { impl<S> Middleware<S> for CheckLogin {
// We only need to hook into the `start` for this middleware. // We only need to hook into the `start` for this middleware.
fn start(&self, req: &mut HttpRequest<S>) -> Result<Started> { fn start(&self, req: &HttpRequest<S>) -> Result<Started> {
let is_logged_in = false; // Change this to see the change in outcome in the browser let is_logged_in = false; // Change this to see the change in outcome in the browser
if is_logged_in { if is_logged_in {

View File

@ -8,21 +8,17 @@ use actix_web::{HttpRequest, HttpResponse, Result};
pub struct SayHi; pub struct SayHi;
impl<S> Middleware<S> for SayHi { impl<S> Middleware<S> for SayHi {
fn start(&self, req: &mut HttpRequest<S>) -> Result<Started> { fn start(&self, req: &HttpRequest<S>) -> Result<Started> {
println!("Hi from start. You requested: {}", req.path()); println!("Hi from start. You requested: {}", req.path());
Ok(Started::Done) Ok(Started::Done)
} }
fn response( fn response(&self, _req: &HttpRequest<S>, resp: HttpResponse) -> Result<Response> {
&self,
_req: &mut HttpRequest<S>,
resp: HttpResponse,
) -> Result<Response> {
println!("Hi from response"); println!("Hi from response");
Ok(Response::Done(resp)) Ok(Response::Done(resp))
} }
fn finish(&self, _req: &mut HttpRequest<S>, _resp: &HttpResponse) -> Finished { fn finish(&self, _req: &HttpRequest<S>, _resp: &HttpResponse) -> Finished {
println!("Hi from finish"); println!("Hi from finish");
Finished::Done Finished::Done
} }

View File

@ -11,5 +11,7 @@ path = "src/main.rs"
[dependencies] [dependencies]
env_logger = "*" env_logger = "*"
futures = "0.1" futures = "0.1"
actix = "0.5"
actix-web = "^0.6" actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -4,13 +4,13 @@ extern crate actix_web;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate futures;
use std::cell::Cell;
use std::fs; use std::fs;
use std::io::Write; use std::io::Write;
use std::cell::Cell;
use actix_web::{ use actix_web::{
error, http, middleware, multipart, server, App, Error, FutureResponse, HttpMessage, dev, error, http, middleware, multipart, server, App, Error, FutureResponse,
HttpRequest, HttpResponse, HttpMessage, HttpRequest, HttpResponse,
}; };
use futures::future; use futures::future;
@ -20,7 +20,9 @@ pub struct AppState {
pub counter: Cell<usize>, pub counter: Cell<usize>,
} }
pub fn save_file( field: multipart::Field<HttpRequest<AppState>>) -> Box<Future<Item = i64, Error = Error>> { pub fn save_file(
field: multipart::Field<dev::Payload>,
) -> Box<Future<Item = i64, Error = Error>> {
let file_path_string = "upload.png"; let file_path_string = "upload.png";
let mut file = match fs::File::create(file_path_string) { let mut file = match fs::File::create(file_path_string) {
Ok(file) => file, Ok(file) => file,
@ -45,7 +47,9 @@ pub fn save_file( field: multipart::Field<HttpRequest<AppState>>) -> Box<Future<
) )
} }
pub fn handle_multipart_item( item: multipart::MultipartItem<HttpRequest<AppState>>) -> Box<Stream<Item = i64, Error = Error>> { pub fn handle_multipart_item(
item: multipart::MultipartItem<dev::Payload>,
) -> Box<Stream<Item = i64, Error = Error>> {
match item { match item {
multipart::MultipartItem::Field(field) => { multipart::MultipartItem::Field(field) => {
Box::new(save_file(field).into_stream()) Box::new(save_file(field).into_stream())
@ -60,10 +64,9 @@ pub fn handle_multipart_item( item: multipart::MultipartItem<HttpRequest<AppStat
pub fn upload(req: HttpRequest<AppState>) -> FutureResponse<HttpResponse> { pub fn upload(req: HttpRequest<AppState>) -> FutureResponse<HttpResponse> {
req.state().counter.set(req.state().counter.get() + 1); req.state().counter.set(req.state().counter.get() + 1);
println!("{:?}", req.state().counter.get()); println!("{:?}", req.state().counter.get());
Box::new( Box::new(
req.clone() req.multipart()
.multipart()
.map_err(error::ErrorInternalServerError) .map_err(error::ErrorInternalServerError)
.map(handle_multipart_item) .map(handle_multipart_item)
.flatten() .flatten()
@ -96,8 +99,9 @@ fn main() {
let sys = actix::System::new("multipart-example"); let sys = actix::System::new("multipart-example");
server::new(|| { server::new(|| {
App::with_state(AppState{counter: Cell::new(0)}) App::with_state(AppState {
.middleware(middleware::Logger::default()) counter: Cell::new(0),
}).middleware(middleware::Logger::default())
.resource("/", |r| { .resource("/", |r| {
r.method(http::Method::GET).with(index); r.method(http::Method::GET).with(index);
r.method(http::Method::POST).with(upload); r.method(http::Method::POST).with(upload);

View File

@ -13,5 +13,6 @@ env_logger = "*"
prost = "0.2.0" prost = "0.2.0"
prost-derive = "0.2.0" prost-derive = "0.2.0"
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -26,7 +26,7 @@ pub struct MyObj {
} }
/// This handler uses `ProtoBufMessage` for loading protobuf object. /// This handler uses `ProtoBufMessage` for loading protobuf object.
fn index(req: HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> { fn index(req: &HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> {
protobuf::ProtoBufMessage::new(req) protobuf::ProtoBufMessage::new(req)
.from_err() // convert all errors into `Error` .from_err() // convert all errors into `Error`
.and_then(|val: MyObj| { .and_then(|val: MyObj| {

View File

@ -1,4 +1,4 @@
use bytes::{Bytes, BytesMut}; use bytes::BytesMut;
use futures::{Future, Poll, Stream}; use futures::{Future, Poll, Stream};
use bytes::IntoBuf; use bytes::IntoBuf;
@ -8,7 +8,7 @@ use prost::Message;
use actix_web::dev::HttpResponseBuilder; use actix_web::dev::HttpResponseBuilder;
use actix_web::error::{Error, PayloadError, ResponseError}; use actix_web::error::{Error, PayloadError, ResponseError};
use actix_web::http::header::{CONTENT_LENGTH, CONTENT_TYPE}; use actix_web::http::header::CONTENT_TYPE;
use actix_web::{HttpMessage, HttpRequest, HttpResponse, Responder}; use actix_web::{HttpMessage, HttpRequest, HttpResponse, Responder};
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
@ -72,84 +72,32 @@ impl<T: Message> Responder for ProtoBuf<T> {
} }
} }
pub struct ProtoBufMessage<T, U: Message + Default> { pub struct ProtoBufMessage<U: Message + Default> {
limit: usize, fut: Box<Future<Item = U, Error = ProtoBufPayloadError>>,
ct: &'static str,
req: Option<T>,
fut: Option<Box<Future<Item = U, Error = ProtoBufPayloadError>>>,
} }
impl<T, U: Message + Default> ProtoBufMessage<T, U> { impl<U: Message + Default + 'static> ProtoBufMessage<U> {
/// Create `ProtoBufMessage` for request. /// Create `ProtoBufMessage` for request.
pub fn new(req: T) -> Self { pub fn new(req: &HttpRequest) -> Self {
ProtoBufMessage { let fut = req
limit: 262_144, .payload()
req: Some(req), .map_err(|e| ProtoBufPayloadError::Payload(e))
fut: None, .fold(BytesMut::new(), move |mut body, chunk| {
ct: "application/protobuf", body.extend_from_slice(&chunk);
} Ok::<_, ProtoBufPayloadError>(body)
} })
.and_then(|body| Ok(<U>::decode(&mut body.into_buf())?));
/// Change max size of payload. By default max size is 256Kb ProtoBufMessage { fut: Box::new(fut) }
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
/// Set allowed content type.
///
/// By default *application/protobuf* content type is used. Set content type
/// to empty string if you want to disable content type check.
pub fn content_type(mut self, ct: &'static str) -> Self {
self.ct = ct;
self
} }
} }
impl<T, U: Message + Default + 'static> Future for ProtoBufMessage<T, U> impl<U: Message + Default + 'static> Future for ProtoBufMessage<U> where {
where
T: HttpMessage + Stream<Item = Bytes, Error = PayloadError> + 'static,
{
type Item = U; type Item = U;
type Error = ProtoBufPayloadError; type Error = ProtoBufPayloadError;
fn poll(&mut self) -> Poll<U, ProtoBufPayloadError> { fn poll(&mut self) -> Poll<U, ProtoBufPayloadError> {
if let Some(req) = self.req.take() { self.fut.poll()
if let Some(len) = req.headers().get(CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<usize>() {
if len > self.limit {
return Err(ProtoBufPayloadError::Overflow);
}
} else {
return Err(ProtoBufPayloadError::Overflow);
}
}
}
// check content-type
if !self.ct.is_empty() && req.content_type() != self.ct {
return Err(ProtoBufPayloadError::ContentType);
}
let limit = self.limit;
let fut = req
.from_err()
.fold(BytesMut::new(), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit {
Err(ProtoBufPayloadError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.and_then(|body| Ok(<U>::decode(&mut body.into_buf())?));
self.fut = Some(Box::new(fut));
}
self.fut
.as_mut()
.expect("ProtoBufBody could not be used second time")
.poll()
} }
} }

View File

@ -6,8 +6,10 @@ workspace = "../"
[dependencies] [dependencies]
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = "^0.6" actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
futures = "0.1" futures = "0.1"
uuid = { version = "0.5", features = ["serde", "v4"] } uuid = { version = "0.5", features = ["serde", "v4"] }

View File

@ -22,11 +22,11 @@ use db::{CreateUser, DbExecutor};
/// State with DbExecutor address /// State with DbExecutor address
struct State { struct State {
db: Addr<Syn, DbExecutor>, db: Addr<DbExecutor>,
} }
/// Async request handler /// Async request handler
fn index(req: HttpRequest<State>) -> Box<Future<Item = HttpResponse, Error = Error>> { fn index(req: &HttpRequest<State>) -> Box<Future<Item = HttpResponse, Error = Error>> {
let name = &req.match_info()["name"]; let name = &req.match_info()["name"];
req.state() req.state()

View File

@ -7,5 +7,7 @@ workspace = "../"
[dependencies] [dependencies]
futures = "0.1" futures = "0.1"
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = "^0.6" actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -25,7 +25,7 @@ struct AppState {
} }
/// simple handle /// simple handle
fn index(req: HttpRequest<AppState>) -> HttpResponse { fn index(req: &HttpRequest<AppState>) -> HttpResponse {
println!("{:?}", req); println!("{:?}", req);
req.state().counter.set(req.state().counter.get() + 1); req.state().counter.set(req.state().counter.get() + 1);

View File

@ -7,5 +7,7 @@ workspace = "../"
[dependencies] [dependencies]
futures = "0.1" futures = "0.1"
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = "0.6" actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -17,7 +17,7 @@ fn main() {
.middleware(middleware::Logger::default()) .middleware(middleware::Logger::default())
.handler( .handler(
"/", "/",
fs::StaticFiles::new("./static/").index_file("index.html") fs::StaticFiles::new("./static/").unwrap().index_file("index.html")
) )
}).bind("127.0.0.1:8080") }).bind("127.0.0.1:8080")
.expect("Can not start server on given IP/Port") .expect("Can not start server on given IP/Port")

View File

@ -6,9 +6,11 @@ workspace = "../"
[dependencies] [dependencies]
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = "^0.6"
askama = "0.6" askama = "0.6"
actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
[build-dependencies] [build-dependencies]
askama = "0.6" askama = "0.6"

View File

@ -6,6 +6,7 @@ workspace = "../"
[dependencies] [dependencies]
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = "^0.6"
tera = "*" tera = "*"
actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -10,6 +10,7 @@ path = "src/main.rs"
[dependencies] [dependencies]
env_logger = "0.5" env_logger = "0.5"
actix = "0.5"
actix-web = { version = "^0.6", features=["alpn"] }
openssl = { version="0.10" } openssl = { version="0.10" }
actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -8,7 +8,7 @@ use actix_web::{http, middleware, server, App, Error, HttpRequest, HttpResponse}
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
/// simple handle /// simple handle
fn index(req: HttpRequest) -> Result<HttpResponse, Error> { fn index(req: &HttpRequest) -> Result<HttpResponse, Error> {
println!("{:?}", req); println!("{:?}", req);
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.content_type("text/plain") .content_type("text/plain")

View File

@ -6,6 +6,8 @@ workspace = "../"
[dependencies] [dependencies]
env_logger = "0.5" env_logger = "0.5"
actix = "0.5" tokio-uds = "0.2"
actix-web = "^0.6"
tokio-uds = "0.1" actix = "0.7"
#actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -7,7 +7,7 @@ use actix::*;
use actix_web::{middleware, server, App, HttpRequest}; use actix_web::{middleware, server, App, HttpRequest};
use tokio_uds::UnixListener; use tokio_uds::UnixListener;
fn index(_req: HttpRequest) -> &'static str { fn index(_req: &HttpRequest) -> &'static str {
"Hello world!" "Hello world!"
} }
@ -16,8 +16,7 @@ fn main() {
env_logger::init(); env_logger::init();
let sys = actix::System::new("unix-socket"); let sys = actix::System::new("unix-socket");
let listener = UnixListener::bind("/tmp/actix-uds.socket", Arbiter::handle()) let listener = UnixListener::bind("/tmp/actix-uds.socket").expect("bind failed");
.expect("bind failed");
server::new(|| { server::new(|| {
App::new() App::new()
// enable logger // enable logger

View File

@ -10,8 +10,10 @@ serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
http = "0.1" http = "0.1"
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }
dotenv = "0.10" dotenv = "0.10"
env_logger = "0.5" env_logger = "0.5"
futures = "0.1" futures = "0.1"

View File

@ -8,7 +8,10 @@ extern crate serde;
extern crate serde_json; extern crate serde_json;
use actix_web::{ use actix_web::{
http::{header, Method}, middleware, middleware::cors::Cors, server, App, http::{header, Method},
middleware,
middleware::cors::Cors,
server, App,
}; };
use std::env; use std::env;

View File

@ -20,5 +20,6 @@ env_logger = "*"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -24,11 +24,11 @@ mod server;
/// This is our websocket route state, this state is shared with all route /// This is our websocket route state, this state is shared with all route
/// instances via `HttpContext::state()` /// instances via `HttpContext::state()`
struct WsChatSessionState { struct WsChatSessionState {
addr: Addr<Syn, server::ChatServer>, addr: Addr<server::ChatServer>,
} }
/// Entry point for our route /// Entry point for our route
fn chat_route(req: HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> { fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> {
ws::start( ws::start(
req, req,
WsChatSession { WsChatSession {
@ -63,7 +63,7 @@ impl Actor for WsChatSession {
// before processing any other events. // before processing any other events.
// HttpContext::state() is instance of WsChatSessionState, state is shared // HttpContext::state() is instance of WsChatSessionState, state is shared
// across all routes within application // across all routes within application
let addr: Addr<Syn, _> = ctx.address(); let addr = ctx.address();
ctx.state() ctx.state()
.addr .addr
.send(server::Connect { .send(server::Connect {
@ -183,7 +183,7 @@ fn main() {
let sys = actix::System::new("websocket-example"); let sys = actix::System::new("websocket-example");
// Start chat server actor in separate thread // Start chat server actor in separate thread
let server: Addr<Syn, _> = Arbiter::start(|_| server::ChatServer::default()); let server = Arbiter::start(|_| server::ChatServer::default());
// Create Http server with websocket support // Create Http server with websocket support
HttpServer::new(move || { HttpServer::new(move || {
@ -193,16 +193,16 @@ fn main() {
}; };
App::with_state(state) App::with_state(state)
// redirect to websocket.html // redirect to websocket.html
.resource("/", |r| r.method(http::Method::GET).f(|_| { .resource("/", |r| r.method(http::Method::GET).f(|_| {
HttpResponse::Found() HttpResponse::Found()
.header("LOCATION", "/static/websocket.html") .header("LOCATION", "/static/websocket.html")
.finish() .finish()
})) }))
// websocket // websocket
.resource("/ws/", |r| r.route().f(chat_route)) .resource("/ws/", |r| r.route().f(chat_route))
// static resources // static resources
.handler("/static/", fs::StaticFiles::new("static/")) .handler("/static/", fs::StaticFiles::new("static/").unwrap())
}).bind("127.0.0.1:8080") }).bind("127.0.0.1:8080")
.unwrap() .unwrap()
.start(); .start();

View File

@ -17,7 +17,7 @@ pub struct Message(pub String);
#[derive(Message)] #[derive(Message)]
#[rtype(usize)] #[rtype(usize)]
pub struct Connect { pub struct Connect {
pub addr: Recipient<Syn, Message>, pub addr: Recipient<Message>,
} }
/// Session is disconnected /// Session is disconnected
@ -56,7 +56,7 @@ pub struct Join {
/// `ChatServer` manages chat rooms and responsible for coordinating chat /// `ChatServer` manages chat rooms and responsible for coordinating chat
/// session. implementation is super primitive /// session. implementation is super primitive
pub struct ChatServer { pub struct ChatServer {
sessions: HashMap<usize, Recipient<Syn, Message>>, sessions: HashMap<usize, Recipient<Message>>,
rooms: HashMap<String, HashSet<usize>>, rooms: HashMap<String, HashSet<usize>>,
rng: RefCell<ThreadRng>, rng: RefCell<ThreadRng>,
} }

View File

@ -18,12 +18,14 @@ bytes = "0.4"
byteorder = "1.1" byteorder = "1.1"
futures = "0.1" futures = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-core = "0.1" tokio-tcp = "0.1"
tokio-codec = "0.1"
env_logger = "*" env_logger = "*"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -5,8 +5,9 @@ extern crate bytes;
extern crate futures; extern crate futures;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate tokio_core; extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_tcp;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
@ -15,10 +16,10 @@ use futures::Future;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use std::{io, net, process, thread}; use std::{io, net, process, thread};
use tokio_core::net::TcpStream; use tokio_codec::FramedRead;
use tokio_io::codec::FramedRead;
use tokio_io::io::WriteHalf; use tokio_io::io::WriteHalf;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
use tokio_tcp::TcpStream;
mod codec; mod codec;
@ -27,10 +28,10 @@ fn main() {
// Connect to server // Connect to server
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
Arbiter::handle().spawn( Arbiter::spawn(
TcpStream::connect(&addr, Arbiter::handle()) TcpStream::connect(&addr)
.and_then(|stream| { .and_then(|stream| {
let addr: Addr<Syn, _> = ChatClient::create(|ctx| { let addr = ChatClient::create(|ctx| {
let (r, w) = stream.split(); let (r, w) = stream.split();
ChatClient::add_stream( ChatClient::add_stream(
FramedRead::new(r, codec::ClientChatCodec), FramedRead::new(r, codec::ClientChatCodec),
@ -87,7 +88,7 @@ impl Actor for ChatClient {
println!("Disconnected"); println!("Disconnected");
// Stop application on disconnect // Stop application on disconnect
Arbiter::system().do_send(actix::msgs::SystemExit(0)); System::current().stop();
} }
} }

View File

@ -6,8 +6,9 @@ extern crate futures;
extern crate rand; extern crate rand;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate tokio_core; extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_tcp;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
@ -28,11 +29,11 @@ mod session;
/// This is our websocket route state, this state is shared with all route /// This is our websocket route state, this state is shared with all route
/// instances via `HttpContext::state()` /// instances via `HttpContext::state()`
struct WsChatSessionState { struct WsChatSessionState {
addr: Addr<Syn, server::ChatServer>, addr: Addr<server::ChatServer>,
} }
/// Entry point for our route /// Entry point for our route
fn chat_route(req: HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> { fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> {
ws::start( ws::start(
req, req,
WsChatSession { WsChatSession {
@ -67,7 +68,7 @@ impl Actor for WsChatSession {
// before processing any other events. // before processing any other events.
// HttpContext::state() is instance of WsChatSessionState, state is shared // HttpContext::state() is instance of WsChatSessionState, state is shared
// across all routes within application // across all routes within application
let addr: Addr<Syn, _> = ctx.address(); let addr = ctx.address();
ctx.state() ctx.state()
.addr .addr
.send(server::Connect { .send(server::Connect {
@ -187,7 +188,7 @@ fn main() {
let sys = actix::System::new("websocket-example"); let sys = actix::System::new("websocket-example");
// Start chat server actor in separate thread // Start chat server actor in separate thread
let server: Addr<Syn, _> = Arbiter::start(|_| server::ChatServer::default()); let server = Arbiter::start(|_| server::ChatServer::default());
// Start tcp server in separate thread // Start tcp server in separate thread
let srv = server.clone(); let srv = server.clone();
@ -204,16 +205,16 @@ fn main() {
}; };
App::with_state(state) App::with_state(state)
// redirect to websocket.html // redirect to websocket.html
.resource("/", |r| r.method(http::Method::GET).f(|_| { .resource("/", |r| r.method(http::Method::GET).f(|_| {
HttpResponse::Found() HttpResponse::Found()
.header("LOCATION", "/static/websocket.html") .header("LOCATION", "/static/websocket.html")
.finish() .finish()
})) }))
// websocket // websocket
.resource("/ws/", |r| r.route().f(chat_route)) .resource("/ws/", |r| r.route().f(chat_route))
// static resources // static resources
.handler("/static/", fs::StaticFiles::new("static/")) .handler("/static/", fs::StaticFiles::new("static/").unwrap())
}).bind("127.0.0.1:8080") }).bind("127.0.0.1:8080")
.unwrap() .unwrap()
.start(); .start();

View File

@ -15,7 +15,7 @@ use session;
#[derive(Message)] #[derive(Message)]
#[rtype(usize)] #[rtype(usize)]
pub struct Connect { pub struct Connect {
pub addr: Recipient<Syn, session::Message>, pub addr: Recipient<session::Message>,
} }
/// Session is disconnected /// Session is disconnected
@ -54,7 +54,7 @@ pub struct Join {
/// `ChatServer` manages chat rooms and responsible for coordinating chat /// `ChatServer` manages chat rooms and responsible for coordinating chat
/// session. implementation is super primitive /// session. implementation is super primitive
pub struct ChatServer { pub struct ChatServer {
sessions: HashMap<usize, Recipient<Syn, session::Message>>, sessions: HashMap<usize, Recipient<session::Message>>,
rooms: HashMap<String, HashSet<usize>>, rooms: HashMap<String, HashSet<usize>>,
rng: RefCell<ThreadRng>, rng: RefCell<ThreadRng>,
} }

View File

@ -4,10 +4,10 @@ use futures::Stream;
use std::str::FromStr; use std::str::FromStr;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{io, net}; use std::{io, net};
use tokio_core::net::{TcpListener, TcpStream}; use tokio_codec::FramedRead;
use tokio_io::codec::FramedRead;
use tokio_io::io::WriteHalf; use tokio_io::io::WriteHalf;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
use tokio_tcp::{TcpListener, TcpStream};
use actix::prelude::*; use actix::prelude::*;
@ -23,7 +23,7 @@ pub struct ChatSession {
/// unique session id /// unique session id
id: usize, id: usize,
/// this is address of chat server /// this is address of chat server
addr: Addr<Syn, ChatServer>, addr: Addr<ChatServer>,
/// Client must send ping at least once per 10 seconds, otherwise we drop /// Client must send ping at least once per 10 seconds, otherwise we drop
/// connection. /// connection.
hb: Instant, hb: Instant,
@ -45,7 +45,7 @@ impl Actor for ChatSession {
// register self in chat server. `AsyncContext::wait` register // register self in chat server. `AsyncContext::wait` register
// future within context, but context waits until this future resolves // future within context, but context waits until this future resolves
// before processing any other events. // before processing any other events.
let addr: Addr<Syn, _> = ctx.address(); let addr = ctx.address();
self.addr self.addr
.send(server::Connect { .send(server::Connect {
addr: addr.recipient(), addr: addr.recipient(),
@ -133,7 +133,7 @@ impl Handler<Message> for ChatSession {
/// Helper methods /// Helper methods
impl ChatSession { impl ChatSession {
pub fn new( pub fn new(
addr: Addr<Syn, ChatServer>, addr: Addr<ChatServer>,
framed: actix::io::FramedWrite<WriteHalf<TcpStream>, ChatCodec>, framed: actix::io::FramedWrite<WriteHalf<TcpStream>, ChatCodec>,
) -> ChatSession { ) -> ChatSession {
ChatSession { ChatSession {
@ -172,14 +172,14 @@ impl ChatSession {
/// Define tcp server that will accept incoming tcp connection and create /// Define tcp server that will accept incoming tcp connection and create
/// chat actors. /// chat actors.
pub struct TcpServer { pub struct TcpServer {
chat: Addr<Syn, ChatServer>, chat: Addr<ChatServer>,
} }
impl TcpServer { impl TcpServer {
pub fn new(s: &str, chat: Addr<Syn, ChatServer>) { pub fn new(s: &str, chat: Addr<ChatServer>) {
// Create server listener // Create server listener
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap(); let listener = TcpListener::bind(&addr).unwrap();
// Our chat server `Server` is an actor, first we need to start it // Our chat server `Server` is an actor, first we need to start it
// and then add stream on incoming tcp connections to it. // and then add stream on incoming tcp connections to it.
@ -187,12 +187,9 @@ impl TcpServer {
// items So to be able to handle this events `Server` actor has to // items So to be able to handle this events `Server` actor has to
// implement stream handler `StreamHandler<(TcpStream, // implement stream handler `StreamHandler<(TcpStream,
// net::SocketAddr), io::Error>` // net::SocketAddr), io::Error>`
let _: () = TcpServer::create(|ctx| { TcpServer::create(|ctx| {
ctx.add_message_stream( ctx.add_message_stream(
listener listener.incoming().map_err(|_| ()).map(|s| TcpConnect(s)),
.incoming()
.map_err(|_| ())
.map(|(t, a)| TcpConnect(t, a)),
); );
TcpServer { chat: chat } TcpServer { chat: chat }
}); });
@ -206,7 +203,7 @@ impl Actor for TcpServer {
} }
#[derive(Message)] #[derive(Message)]
struct TcpConnect(TcpStream, net::SocketAddr); struct TcpConnect(TcpStream);
/// Handle stream of TcpStream's /// Handle stream of TcpStream's
impl Handler<TcpConnect> for TcpServer { impl Handler<TcpConnect> for TcpServer {
@ -216,7 +213,7 @@ impl Handler<TcpConnect> for TcpServer {
// For each incoming connection we create `ChatSession` actor // For each incoming connection we create `ChatSession` actor
// with out chat server address. // with out chat server address.
let server = self.chat.clone(); let server = self.chat.clone();
let _: () = ChatSession::create(|ctx| { ChatSession::create(|ctx| {
let (r, w) = msg.0.split(); let (r, w) = msg.0.split();
ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx); ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx);
ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx)) ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx))

View File

@ -15,6 +15,7 @@ path = "src/client.rs"
[dependencies] [dependencies]
env_logger = "*" env_logger = "*"
futures = "0.1" futures = "0.1"
tokio-core = "0.1"
actix = "0.5" actix = "0.7"
actix-web = "^0.6" #actix-web = "^0.7"
actix-web = { git = "https://github.com/actix/actix-web.git" }

View File

@ -5,13 +5,11 @@ extern crate actix;
extern crate actix_web; extern crate actix_web;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate futures;
extern crate tokio_core;
use std::time::Duration; use std::time::Duration;
use std::{io, thread}; use std::{io, thread};
use actix::*; use actix::*;
use actix_web::ws::WsWriter;
use actix_web::ws::{Client, ClientWriter, Message, ProtocolError}; use actix_web::ws::{Client, ClientWriter, Message, ProtocolError};
use futures::Future; use futures::Future;
@ -20,7 +18,7 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("ws-example"); let sys = actix::System::new("ws-example");
Arbiter::handle().spawn( Arbiter::spawn(
Client::new("http://127.0.0.1:8080/ws/") Client::new("http://127.0.0.1:8080/ws/")
.connect() .connect()
.map_err(|e| { .map_err(|e| {
@ -28,7 +26,7 @@ fn main() {
() ()
}) })
.map(|(reader, writer)| { .map(|(reader, writer)| {
let addr: Addr<Syn, _> = ChatClient::create(|ctx| { let addr = ChatClient::create(|ctx| {
ChatClient::add_stream(reader, ctx); ChatClient::add_stream(reader, ctx);
ChatClient(writer) ChatClient(writer)
}); });
@ -67,7 +65,7 @@ impl Actor for ChatClient {
println!("Disconnected"); println!("Disconnected");
// Stop application on disconnect // Stop application on disconnect
Arbiter::system().do_send(actix::msgs::SystemExit(0)); System::current().stop();
} }
} }

View File

@ -14,7 +14,7 @@ use actix_web::{
}; };
/// do websocket handshake and start `MyWebSocket` actor /// do websocket handshake and start `MyWebSocket` actor
fn ws_index(r: HttpRequest) -> Result<HttpResponse, Error> { fn ws_index(r: &HttpRequest) -> Result<HttpResponse, Error> {
ws::start(r, MyWebSocket) ws::start(r, MyWebSocket)
} }
@ -56,6 +56,7 @@ fn main() {
.resource("/ws/", |r| r.method(http::Method::GET).f(ws_index)) .resource("/ws/", |r| r.method(http::Method::GET).f(ws_index))
// static files // static files
.handler("/", fs::StaticFiles::new("static/") .handler("/", fs::StaticFiles::new("static/")
.unwrap()
.index_file("index.html"))) .index_file("index.html")))
// start http server on 127.0.0.1:8080 // start http server on 127.0.0.1:8080
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()