mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-28 01:32:57 +01:00
actix 0.5.5, ws test
This commit is contained in:
parent
e7ec0f9fd7
commit
35ee5d36d8
@ -42,7 +42,7 @@ session = ["cookie/secure"]
|
|||||||
brotli = ["brotli2"]
|
brotli = ["brotli2"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix = "^0.5.4"
|
actix = "^0.5.5"
|
||||||
|
|
||||||
base64 = "0.9"
|
base64 = "0.9"
|
||||||
bitflags = "1.0"
|
bitflags = "1.0"
|
||||||
|
@ -557,8 +557,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
|
|||||||
Ok(result) => res = Some(result),
|
Ok(result) => res = Some(result),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Frame::Drain(fut) =>
|
Frame::Drain(fut) => self.drain = Some(fut),
|
||||||
self.drain = Some(fut),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.iostate = IOState::Actor(ctx);
|
self.iostate = IOState::Actor(ctx);
|
||||||
|
@ -241,7 +241,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
|||||||
self.encoder.write(payload)?;
|
self.encoder.write(payload)?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// might be response to EXCEPT
|
// could be response to EXCEPT header
|
||||||
self.buffer.extend_from_slice(payload.as_ref())
|
self.buffer.extend_from_slice(payload.as_ref())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -456,14 +456,12 @@ impl Stream for ClientReader {
|
|||||||
Ok(Async::Ready(Some(frame))) => {
|
Ok(Async::Ready(Some(frame))) => {
|
||||||
let (finished, opcode, payload) = frame.unpack();
|
let (finished, opcode, payload) = frame.unpack();
|
||||||
|
|
||||||
|
match opcode {
|
||||||
// continuation is not supported
|
// continuation is not supported
|
||||||
if !finished {
|
OpCode::Continue => {
|
||||||
inner.closed = true;
|
inner.closed = true;
|
||||||
return Err(ProtocolError::NoContinuation)
|
return Err(ProtocolError::NoContinuation)
|
||||||
}
|
},
|
||||||
|
|
||||||
match opcode {
|
|
||||||
OpCode::Continue => unimplemented!(),
|
|
||||||
OpCode::Bad => {
|
OpCode::Bad => {
|
||||||
inner.closed = true;
|
inner.closed = true;
|
||||||
Err(ProtocolError::BadOpCode)
|
Err(ProtocolError::BadOpCode)
|
||||||
|
@ -91,3 +91,56 @@ fn test_large_bin() {
|
|||||||
assert_eq!(item, Some(ws::Message::Binary(Binary::from(data.clone()))));
|
assert_eq!(item, Some(ws::Message::Binary(Binary::from(data.clone()))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct Ws2 {
|
||||||
|
count: usize
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Actor for Ws2 {
|
||||||
|
type Context = ws::WebsocketContext<Self>;
|
||||||
|
|
||||||
|
fn started(&mut self, ctx: &mut Self::Context) {
|
||||||
|
self.send(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ws2 {
|
||||||
|
fn send(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
|
||||||
|
ctx.text("0".repeat(65_536));
|
||||||
|
ctx.drain().and_then(|_, act, ctx| {
|
||||||
|
act.count += 1;
|
||||||
|
if act.count != 100 {
|
||||||
|
act.send(ctx);
|
||||||
|
}
|
||||||
|
actix::fut::ok(())
|
||||||
|
}).wait(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws2 {
|
||||||
|
|
||||||
|
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
|
||||||
|
match msg {
|
||||||
|
ws::Message::Ping(msg) => ctx.pong(&msg),
|
||||||
|
ws::Message::Text(text) => ctx.text(text),
|
||||||
|
ws::Message::Binary(bin) => ctx.binary(bin),
|
||||||
|
ws::Message::Close(reason) => ctx.close(reason, ""),
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_server_send_text() {
|
||||||
|
let data = Some(ws::Message::Text("0".repeat(65_536)));
|
||||||
|
|
||||||
|
let mut srv = test::TestServer::new(
|
||||||
|
|app| app.handler(|req| ws::start(req, Ws2{count:0})));
|
||||||
|
let (mut reader, _writer) = srv.ws().unwrap();
|
||||||
|
|
||||||
|
for _ in 0..100 {
|
||||||
|
let (item, r) = srv.execute(reader.into_future()).unwrap();
|
||||||
|
reader = r;
|
||||||
|
assert_eq!(item, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user