|
|
|
@@ -35,15 +35,68 @@ where
|
|
|
|
|
Ok(res.streaming(WebsocketContext::create(actor, stream)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Do websocket handshake and start ws actor.
|
|
|
|
|
///
|
|
|
|
|
/// `req` is an HTTP Request that should be requesting a websocket protocol
|
|
|
|
|
/// change. `stream` should be a `Bytes` stream (such as
|
|
|
|
|
/// `actix_web::web::Payload`) that contains a stream of the body request.
|
|
|
|
|
///
|
|
|
|
|
/// If there is a problem with the handshake, an error is returned.
|
|
|
|
|
///
|
|
|
|
|
/// If successful, returns a pair where the first item is an address for the
|
|
|
|
|
/// created actor and the second item is the response that should be returned
|
|
|
|
|
/// from the websocket request.
|
|
|
|
|
pub fn start_with_addr<A, T>(
|
|
|
|
|
actor: A,
|
|
|
|
|
req: &HttpRequest,
|
|
|
|
|
stream: T,
|
|
|
|
|
) -> Result<(Addr<A>, HttpResponse), Error>
|
|
|
|
|
where
|
|
|
|
|
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
|
|
|
|
|
T: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
|
|
|
|
{
|
|
|
|
|
let mut res = handshake(req)?;
|
|
|
|
|
let (addr, out_stream) = WebsocketContext::create_with_addr(actor, stream);
|
|
|
|
|
Ok((addr, res.streaming(out_stream)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Do websocket handshake and start ws actor.
|
|
|
|
|
///
|
|
|
|
|
/// `protocols` is a sequence of known protocols.
|
|
|
|
|
pub fn start_with_protocols<A, T>(
|
|
|
|
|
actor: A,
|
|
|
|
|
protocols: &[&str],
|
|
|
|
|
req: &HttpRequest,
|
|
|
|
|
stream: T,
|
|
|
|
|
) -> Result<HttpResponse, Error>
|
|
|
|
|
where
|
|
|
|
|
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
|
|
|
|
|
T: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
|
|
|
|
{
|
|
|
|
|
let mut res = handshake_with_protocols(req, protocols)?;
|
|
|
|
|
Ok(res.streaming(WebsocketContext::create(actor, stream)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Prepare `WebSocket` handshake response.
|
|
|
|
|
///
|
|
|
|
|
/// This function returns handshake `HttpResponse`, ready to send to peer.
|
|
|
|
|
/// It does not perform any IO.
|
|
|
|
|
pub fn handshake(req: &HttpRequest) -> Result<HttpResponseBuilder, HandshakeError> {
|
|
|
|
|
handshake_with_protocols(req, &[])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Prepare `WebSocket` handshake response.
|
|
|
|
|
///
|
|
|
|
|
/// This function returns handshake `HttpResponse`, ready to send to peer.
|
|
|
|
|
/// It does not perform any IO.
|
|
|
|
|
///
|
|
|
|
|
// /// `protocols` is a sequence of known protocols. On successful handshake,
|
|
|
|
|
// /// the returned response headers contain the first protocol in this list
|
|
|
|
|
// /// which the server also knows.
|
|
|
|
|
pub fn handshake(req: &HttpRequest) -> Result<HttpResponseBuilder, HandshakeError> {
|
|
|
|
|
/// `protocols` is a sequence of known protocols. On successful handshake,
|
|
|
|
|
/// the returned response headers contain the first protocol in this list
|
|
|
|
|
/// which the server also knows.
|
|
|
|
|
pub fn handshake_with_protocols(
|
|
|
|
|
req: &HttpRequest,
|
|
|
|
|
protocols: &[&str],
|
|
|
|
|
) -> Result<HttpResponseBuilder, HandshakeError> {
|
|
|
|
|
// WebSocket accepts only GET
|
|
|
|
|
if *req.method() != Method::GET {
|
|
|
|
|
return Err(HandshakeError::GetMethodRequired);
|
|
|
|
@@ -92,11 +145,28 @@ pub fn handshake(req: &HttpRequest) -> Result<HttpResponseBuilder, HandshakeErro
|
|
|
|
|
hash_key(key.as_ref())
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(HttpResponse::build(StatusCode::SWITCHING_PROTOCOLS)
|
|
|
|
|
// check requested protocols
|
|
|
|
|
let protocol =
|
|
|
|
|
req.headers()
|
|
|
|
|
.get(&header::SEC_WEBSOCKET_PROTOCOL)
|
|
|
|
|
.and_then(|req_protocols| {
|
|
|
|
|
let req_protocols = req_protocols.to_str().ok()?;
|
|
|
|
|
req_protocols
|
|
|
|
|
.split(", ")
|
|
|
|
|
.find(|req_p| protocols.iter().any(|p| p == req_p))
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let mut response = HttpResponse::build(StatusCode::SWITCHING_PROTOCOLS)
|
|
|
|
|
.upgrade("websocket")
|
|
|
|
|
.header(header::TRANSFER_ENCODING, "chunked")
|
|
|
|
|
.header(header::SEC_WEBSOCKET_ACCEPT, key.as_str())
|
|
|
|
|
.take())
|
|
|
|
|
.take();
|
|
|
|
|
|
|
|
|
|
if let Some(protocol) = protocol {
|
|
|
|
|
response.header(&header::SEC_WEBSOCKET_PROTOCOL, protocol);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(response)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Execution context for `WebSockets` actors
|
|
|
|
@@ -168,6 +238,24 @@ where
|
|
|
|
|
#[inline]
|
|
|
|
|
/// Create a new Websocket context from a request and an actor
|
|
|
|
|
pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Bytes, Error = Error>
|
|
|
|
|
where
|
|
|
|
|
A: StreamHandler<Message, ProtocolError>,
|
|
|
|
|
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
|
|
|
|
{
|
|
|
|
|
let (_, stream) = WebsocketContext::create_with_addr(actor, stream);
|
|
|
|
|
stream
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
/// Create a new Websocket context from a request and an actor.
|
|
|
|
|
///
|
|
|
|
|
/// Returns a pair, where the first item is an addr for the created actor,
|
|
|
|
|
/// and the second item is a stream intended to be set as part of the
|
|
|
|
|
/// response via `HttpResponseBuilder::streaming()`.
|
|
|
|
|
pub fn create_with_addr<S>(
|
|
|
|
|
actor: A,
|
|
|
|
|
stream: S,
|
|
|
|
|
) -> (Addr<A>, impl Stream<Item = Bytes, Error = Error>)
|
|
|
|
|
where
|
|
|
|
|
A: StreamHandler<Message, ProtocolError>,
|
|
|
|
|
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
|
|
|
@@ -179,7 +267,9 @@ where
|
|
|
|
|
};
|
|
|
|
|
ctx.add_stream(WsStream::new(stream, Codec::new()));
|
|
|
|
|
|
|
|
|
|
WebsocketContextFut::new(ctx, actor, mb, Codec::new())
|
|
|
|
|
let addr = ctx.address();
|
|
|
|
|
|
|
|
|
|
(addr, WebsocketContextFut::new(ctx, actor, mb, Codec::new()))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
@@ -564,5 +654,87 @@ mod tests {
|
|
|
|
|
StatusCode::SWITCHING_PROTOCOLS,
|
|
|
|
|
handshake(&req).unwrap().finish().status()
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let req = TestRequest::default()
|
|
|
|
|
.header(
|
|
|
|
|
header::UPGRADE,
|
|
|
|
|
header::HeaderValue::from_static("websocket"),
|
|
|
|
|
)
|
|
|
|
|
.header(
|
|
|
|
|
header::CONNECTION,
|
|
|
|
|
header::HeaderValue::from_static("upgrade"),
|
|
|
|
|
)
|
|
|
|
|
.header(
|
|
|
|
|
header::SEC_WEBSOCKET_VERSION,
|
|
|
|
|
header::HeaderValue::from_static("13"),
|
|
|
|
|
)
|
|
|
|
|
.header(
|
|
|
|
|
header::SEC_WEBSOCKET_KEY,
|
|
|
|
|
header::HeaderValue::from_static("13"),
|
|
|
|
|
)
|
|
|
|
|
.header(
|
|
|
|
|
header::SEC_WEBSOCKET_PROTOCOL,
|
|
|
|
|
header::HeaderValue::from_static("graphql"),
|
|
|
|
|
)
|
|
|
|
|
.to_http_request();
|
|
|
|
|
|
|
|
|
|
let protocols = ["graphql"];
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
StatusCode::SWITCHING_PROTOCOLS,
|
|
|
|
|
handshake_with_protocols(&req, &protocols)
|
|
|
|
|
.unwrap()
|
|
|
|
|
.finish()
|
|
|
|
|
.status()
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
Some(&header::HeaderValue::from_static("graphql")),
|
|
|
|
|
handshake_with_protocols(&req, &protocols)
|
|
|
|
|
.unwrap()
|
|
|
|
|
.finish()
|
|
|
|
|
.headers()
|
|
|
|
|
.get(&header::SEC_WEBSOCKET_PROTOCOL)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let req = TestRequest::default()
|
|
|
|
|
.header(
|
|
|
|
|
header::UPGRADE,
|
|
|
|
|
header::HeaderValue::from_static("websocket"),
|
|
|
|
|
)
|
|
|
|
|
.header(
|
|
|
|
|
header::CONNECTION,
|
|
|
|
|
header::HeaderValue::from_static("upgrade"),
|
|
|
|
|
)
|
|
|
|
|
.header(
|
|
|
|
|
header::SEC_WEBSOCKET_VERSION,
|
|
|
|
|
header::HeaderValue::from_static("13"),
|
|
|
|
|
)
|
|
|
|
|
.header(
|
|
|
|
|
header::SEC_WEBSOCKET_KEY,
|
|
|
|
|
header::HeaderValue::from_static("13"),
|
|
|
|
|
)
|
|
|
|
|
.header(
|
|
|
|
|
header::SEC_WEBSOCKET_PROTOCOL,
|
|
|
|
|
header::HeaderValue::from_static("p1, p2, p3"),
|
|
|
|
|
)
|
|
|
|
|
.to_http_request();
|
|
|
|
|
|
|
|
|
|
let protocols = vec!["p3", "p2"];
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
StatusCode::SWITCHING_PROTOCOLS,
|
|
|
|
|
handshake_with_protocols(&req, &protocols)
|
|
|
|
|
.unwrap()
|
|
|
|
|
.finish()
|
|
|
|
|
.status()
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
Some(&header::HeaderValue::from_static("p2")),
|
|
|
|
|
handshake_with_protocols(&req, &protocols)
|
|
|
|
|
.unwrap()
|
|
|
|
|
.finish()
|
|
|
|
|
.headers()
|
|
|
|
|
.get(&header::SEC_WEBSOCKET_PROTOCOL)
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|