mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-28 09:42:40 +01:00
remove unsafe
This commit is contained in:
parent
80339147b9
commit
05a43a855e
@ -232,19 +232,17 @@ where
|
|||||||
let mut io = false;
|
let mut io = false;
|
||||||
let mut idx = 0;
|
let mut idx = 0;
|
||||||
while idx < self.tasks.len() {
|
while idx < self.tasks.len() {
|
||||||
let item: &mut Entry<H> = unsafe { &mut *(&mut self.tasks[idx] as *mut _) };
|
|
||||||
|
|
||||||
// only one task can do io operation in http/1
|
// only one task can do io operation in http/1
|
||||||
if !io && !item.flags.contains(EntryFlags::EOF) {
|
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF) {
|
||||||
// io is corrupted, send buffer
|
// io is corrupted, send buffer
|
||||||
if item.flags.contains(EntryFlags::ERROR) {
|
if self.tasks[idx].flags.contains(EntryFlags::ERROR) {
|
||||||
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
||||||
return Ok(Async::NotReady);
|
return Ok(Async::NotReady);
|
||||||
}
|
}
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
|
|
||||||
match item.pipe.poll_io(&mut self.stream) {
|
match self.tasks[idx].pipe.poll_io(&mut self.stream) {
|
||||||
Ok(Async::Ready(ready)) => {
|
Ok(Async::Ready(ready)) => {
|
||||||
// override keep-alive state
|
// override keep-alive state
|
||||||
if self.stream.keepalive() {
|
if self.stream.keepalive() {
|
||||||
@ -256,9 +254,11 @@ where
|
|||||||
self.stream.reset();
|
self.stream.reset();
|
||||||
|
|
||||||
if ready {
|
if ready {
|
||||||
item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED);
|
self.tasks[idx]
|
||||||
|
.flags
|
||||||
|
.insert(EntryFlags::EOF | EntryFlags::FINISHED);
|
||||||
} else {
|
} else {
|
||||||
item.flags.insert(EntryFlags::EOF);
|
self.tasks[idx].flags.insert(EntryFlags::EOF);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// no more IO for this iteration
|
// no more IO for this iteration
|
||||||
@ -273,7 +273,7 @@ where
|
|||||||
// it is not possible to recover from error
|
// it is not possible to recover from error
|
||||||
// during pipe handling, so just drop connection
|
// during pipe handling, so just drop connection
|
||||||
error!("Unhandled error: {}", err);
|
error!("Unhandled error: {}", err);
|
||||||
item.flags.insert(EntryFlags::ERROR);
|
self.tasks[idx].flags.insert(EntryFlags::ERROR);
|
||||||
|
|
||||||
// check stream state, we still can have valid data in buffer
|
// check stream state, we still can have valid data in buffer
|
||||||
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
||||||
@ -282,13 +282,15 @@ where
|
|||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if !item.flags.contains(EntryFlags::FINISHED) {
|
} else if !self.tasks[idx].flags.contains(EntryFlags::FINISHED) {
|
||||||
match item.pipe.poll_completed() {
|
match self.tasks[idx].pipe.poll_completed() {
|
||||||
Ok(Async::NotReady) => (),
|
Ok(Async::NotReady) => (),
|
||||||
Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED),
|
Ok(Async::Ready(_)) => {
|
||||||
|
self.tasks[idx].flags.insert(EntryFlags::FINISHED)
|
||||||
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.notify_disconnect();
|
self.notify_disconnect();
|
||||||
item.flags.insert(EntryFlags::ERROR);
|
self.tasks[idx].flags.insert(EntryFlags::ERROR);
|
||||||
error!("Unhandled error: {}", err);
|
error!("Unhandled error: {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user