From 35ee5d36d8e6e374467a65a172b86d4fd2921f06 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 19 Mar 2018 13:12:36 -0700 Subject: [PATCH] actix 0.5.5, ws test --- Cargo.toml | 2 +- src/pipeline.rs | 3 +-- src/server/h1writer.rs | 2 +- src/ws/client.rs | 12 ++++------ tests/test_ws.rs | 53 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 61 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 552b1b17..4527d934 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ session = ["cookie/secure"] brotli = ["brotli2"] [dependencies] -actix = "^0.5.4" +actix = "^0.5.5" base64 = "0.9" bitflags = "1.0" diff --git a/src/pipeline.rs b/src/pipeline.rs index 0f9b3533..e92e16f5 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -557,8 +557,7 @@ impl ProcessResponse { Ok(result) => res = Some(result), } }, - Frame::Drain(fut) => - self.drain = Some(fut), + Frame::Drain(fut) => self.drain = Some(fut), } } self.iostate = IOState::Actor(ctx); diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 68ce71af..531e3c8d 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -241,7 +241,7 @@ impl Writer for H1Writer { self.encoder.write(payload)?; } } else { - // might be response to EXCEPT + // could be response to EXCEPT header self.buffer.extend_from_slice(payload.as_ref()) } } diff --git a/src/ws/client.rs b/src/ws/client.rs index 049930ea..c5fdcf79 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -456,14 +456,12 @@ impl Stream for ClientReader { Ok(Async::Ready(Some(frame))) => { let (finished, opcode, payload) = frame.unpack(); - // continuation is not supported - if !finished { - inner.closed = true; - return Err(ProtocolError::NoContinuation) - } - match opcode { - OpCode::Continue => unimplemented!(), + // continuation is not supported + OpCode::Continue => { + inner.closed = true; + return Err(ProtocolError::NoContinuation) + }, OpCode::Bad => { inner.closed = true; Err(ProtocolError::BadOpCode) diff --git a/tests/test_ws.rs b/tests/test_ws.rs index c1319b26..4d3ed472 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -91,3 +91,56 @@ fn test_large_bin() { assert_eq!(item, Some(ws::Message::Binary(Binary::from(data.clone())))); } } + +struct Ws2 { + count: usize +} + +impl Actor for Ws2 { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + self.send(ctx); + } +} + +impl Ws2 { + fn send(&mut self, ctx: &mut ws::WebsocketContext) { + ctx.text("0".repeat(65_536)); + ctx.drain().and_then(|_, act, ctx| { + act.count += 1; + if act.count != 100 { + act.send(ctx); + } + actix::fut::ok(()) + }).wait(ctx); + } +} + +impl StreamHandler for Ws2 { + + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + match msg { + ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Text(text) => ctx.text(text), + ws::Message::Binary(bin) => ctx.binary(bin), + ws::Message::Close(reason) => ctx.close(reason, ""), + _ => (), + } + } +} + +#[test] +fn test_server_send_text() { + let data = Some(ws::Message::Text("0".repeat(65_536))); + + let mut srv = test::TestServer::new( + |app| app.handler(|req| ws::start(req, Ws2{count:0}))); + let (mut reader, _writer) = srv.ws().unwrap(); + + for _ in 0..100 { + let (item, r) = srv.execute(reader.into_future()).unwrap(); + reader = r; + assert_eq!(item, data); + } +}