1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-26 20:57:43 +02:00

Replace calls to Pin::new_unchecked with pin_project.

This is a breaking change, as it changes some public methods to take
`Pin<&mut Self>` rather than `&mut self`.

This brings these methods into line with `Stream::poll_next`, which also
takes a `Pin<&mut Self>`
This commit is contained in:
Aaron Hill
2020-03-04 11:48:19 -05:00
parent 693d5132a9
commit c41b5d8dd4
7 changed files with 116 additions and 104 deletions

View File

@ -77,6 +77,7 @@ where
{
service: S,
state: State<S, U>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<<U as Encoder>::Item>, S::Error>>,
tx: mpsc::Sender<Result<Message<<U as Encoder>::Item>, S::Error>>,
@ -169,7 +170,7 @@ where
&mut self.framed
}
fn poll_read(&mut self, cx: &mut Context<'_>) -> bool
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
@ -180,29 +181,30 @@ where
<U as Encoder>::Error: std::fmt::Debug,
{
loop {
match self.service.poll_ready(cx) {
let this = self.as_mut().project();
match this.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
let item = match self.framed.next_item(cx) {
let item = match this.framed.next_item(cx) {
Poll::Ready(Some(Ok(el))) => el,
Poll::Ready(Some(Err(err))) => {
self.state = State::FramedError(DispatcherError::Decoder(err));
*this.state = State::FramedError(DispatcherError::Decoder(err));
return true;
}
Poll::Pending => return false,
Poll::Ready(None) => {
self.state = State::Stopping;
*this.state = State::Stopping;
return true;
}
};
let tx = self.tx.clone();
actix_rt::spawn(self.service.call(item).map(move |item| {
let tx = this.tx.clone();
actix_rt::spawn(this.service.call(item).map(move |item| {
let _ = tx.send(item.map(Message::Item));
}));
}
Poll::Pending => return false,
Poll::Ready(Err(err)) => {
self.state = State::Error(DispatcherError::Service(err));
*this.state = State::Error(DispatcherError::Service(err));
return true;
}
}
@ -210,7 +212,7 @@ where
}
/// write to framed object
fn poll_write(&mut self, cx: &mut Context<'_>) -> bool
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
@ -221,33 +223,34 @@ where
<U as Encoder>::Error: std::fmt::Debug,
{
loop {
while !self.framed.is_write_buf_full() {
match Pin::new(&mut self.rx).poll_next(cx) {
let mut this = self.as_mut().project();
while !this.framed.is_write_buf_full() {
match Pin::new(&mut this.rx).poll_next(cx) {
Poll::Ready(Some(Ok(Message::Item(msg)))) => {
if let Err(err) = self.framed.write(msg) {
self.state = State::FramedError(DispatcherError::Encoder(err));
if let Err(err) = this.framed.as_mut().write(msg) {
*this.state = State::FramedError(DispatcherError::Encoder(err));
return true;
}
}
Poll::Ready(Some(Ok(Message::Close))) => {
self.state = State::FlushAndStop;
*this.state = State::FlushAndStop;
return true;
}
Poll::Ready(Some(Err(err))) => {
self.state = State::Error(DispatcherError::Service(err));
*this.state = State::Error(DispatcherError::Service(err));
return true;
}
Poll::Ready(None) | Poll::Pending => break,
}
}
if !self.framed.is_write_buf_empty() {
match self.framed.flush(cx) {
if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) {
Poll::Pending => break,
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
self.state = State::FramedError(DispatcherError::Encoder(err));
*this.state = State::FramedError(DispatcherError::Encoder(err));
return true;
}
}
@ -279,7 +282,7 @@ where
return match this.state {
State::Processing => {
if self.poll_read(cx) || self.poll_write(cx) {
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
continue;
} else {
Poll::Pending
@ -287,12 +290,12 @@ where
}
State::Error(_) => {
// flush write buffer
if !self.framed.is_write_buf_empty() {
if let Poll::Pending = self.framed.flush(cx) {
if !this.framed.is_write_buf_empty() {
if let Poll::Pending = this.framed.flush(cx) {
return Poll::Pending;
}
}
Poll::Ready(Err(self.state.take_error()))
Poll::Ready(Err(this.state.take_error()))
}
State::FlushAndStop => {
if !this.framed.is_write_buf_empty() {