Skip to main content
Module

x/deno/core/async_cancel.rs

A modern runtime for JavaScript and TypeScript.
Go to Latest
File
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use crate::RcLike;use crate::Resource;use futures::future::FusedFuture;use futures::future::Future;use futures::future::TryFuture;use futures::task::Context;use futures::task::Poll;use pin_project::pin_project;use std::any::type_name;use std::borrow::Cow;use std::error::Error;use std::fmt;use std::fmt::Display;use std::fmt::Formatter;use std::io;use std::pin::Pin;use std::rc::Rc;
use self::internal as i;
#[derive(Debug, Default)]pub struct CancelHandle { node: i::Node,}
impl CancelHandle { pub fn new() -> Self { Default::default() }
pub fn new_rc() -> Rc<Self> { Rc::new(Self::new()) }
/// Cancel all cancelable futures that are bound to this handle. Note that /// this method does not require a mutable reference to the `CancelHandle`. pub fn cancel(&self) { self.node.cancel(); }
pub fn is_canceled(&self) -> bool { self.node.is_canceled() }}
#[pin_project(project = CancelableProjection)]#[derive(Debug)]pub enum Cancelable<F> { Pending { #[pin] future: F, #[pin] registration: i::Registration, }, Terminated,}
impl<F: Future> Future for Cancelable<F> { type Output = Result<F::Output, Canceled>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { let poll_result = match self.as_mut().project() { CancelableProjection::Pending { future, registration, } => Self::poll_pending(future, registration, cx), CancelableProjection::Terminated => { panic!("{}::poll() called after completion", type_name::<Self>()) } }; // Fuse: if this Future is completed or canceled, make sure the inner // `future` and `registration` fields are dropped in order to unlink it from // its cancel handle. if matches!(poll_result, Poll::Ready(_)) { self.set(Cancelable::Terminated) } poll_result }}
impl<F: Future> FusedFuture for Cancelable<F> { fn is_terminated(&self) -> bool { matches!(self, Self::Terminated) }}
impl Resource for CancelHandle { fn name(&self) -> Cow<str> { "cancellation".into() }
fn close(self: Rc<Self>) { self.cancel(); }}
#[pin_project(project = TryCancelableProjection)]#[derive(Debug)]pub struct TryCancelable<F> { #[pin] inner: Cancelable<F>,}
impl<F, T, E> Future for TryCancelable<F>where F: Future<Output = Result<T, E>>, Canceled: Into<E>,{ type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { let TryCancelableProjection { inner } = self.project(); match inner.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(result)) => Poll::Ready(result), Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), } }}
impl<F, T, E> FusedFuture for TryCancelable<F>where F: Future<Output = Result<T, E>>, Canceled: Into<E>,{ fn is_terminated(&self) -> bool { self.inner.is_terminated() }}
pub trait CancelFuturewhere Self: Future + Sized,{ fn or_cancel<H: RcLike<CancelHandle>>( self, cancel_handle: H, ) -> Cancelable<Self> { Cancelable::new(self, cancel_handle.into()) }}
impl<F> CancelFuture for F where F: Future {}
pub trait CancelTryFuturewhere Self: TryFuture + Sized, Canceled: Into<Self::Error>,{ fn try_or_cancel<H: RcLike<CancelHandle>>( self, cancel_handle: H, ) -> TryCancelable<Self> { TryCancelable::new(self, cancel_handle.into()) }}
impl<F> CancelTryFuture for Fwhere F: TryFuture, Canceled: Into<F::Error>,{}
#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)]pub struct Canceled;
impl Display for Canceled { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "operation canceled") }}
impl Error for Canceled {}
impl From<Canceled> for io::Error { fn from(_: Canceled) -> Self { io::Error::new(io::ErrorKind::Interrupted, Canceled) }}
mod internal { use super::CancelHandle; use super::Cancelable; use super::Canceled; use super::TryCancelable; use crate::RcRef; use futures::future::Future; use futures::task::Context; use futures::task::Poll; use futures::task::Waker; use pin_project::pin_project; use std::any::Any; use std::cell::UnsafeCell; use std::marker::PhantomPinned; use std::mem::replace; use std::pin::Pin; use std::ptr::NonNull; use std::rc::Rc; use std::rc::Weak;
impl<F: Future> Cancelable<F> { pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self { let head_node = RcRef::map(cancel_handle, |r| &r.node); let registration = Registration::WillRegister { head_node }; Self::Pending { future, registration, } }
pub(super) fn poll_pending( future: Pin<&mut F>, mut registration: Pin<&mut Registration>, cx: &mut Context, ) -> Poll<Result<F::Output, Canceled>> { // Do a cancellation check _before_ polling the inner future. If it has // already been canceled the inner future will not be polled. let node = match &*registration { Registration::WillRegister { head_node } => &*head_node, Registration::Registered { node } => node, }; if node.is_canceled() { return Poll::Ready(Err(Canceled)); }
match future.poll(cx) { Poll::Ready(res) => return Poll::Ready(Ok(res)), Poll::Pending => {} }
// Register this future with its `CancelHandle`, saving the `Waker` that // can be used to make the runtime poll this future when it is canceled. // When already registered, update the stored `Waker` if necessary. let head_node = match &*registration { Registration::WillRegister { .. } => { match registration.as_mut().project_replace(Default::default()) { RegistrationProjectionOwned::WillRegister { head_node } => { Some(head_node) } _ => unreachable!(), } } _ => None, }; let node = match registration.project() { RegistrationProjection::Registered { node } => node, _ => unreachable!(), }; node.register(cx.waker(), head_node)?;
Poll::Pending } }
impl<F: Future> TryCancelable<F> { pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self { Self { inner: Cancelable::new(future, cancel_handle), } } }
#[pin_project(project = RegistrationProjection, project_replace = RegistrationProjectionOwned)] #[derive(Debug)] pub enum Registration { WillRegister { head_node: RcRef<Node>, }, Registered { #[pin] node: Node, }, }
impl Default for Registration { fn default() -> Self { Self::Registered { node: Default::default(), } } }
#[derive(Debug)] pub struct Node { inner: UnsafeCell<NodeInner>, _pin: PhantomPinned, }
impl Node { /// If necessary, register a `Cancelable` node with a `CancelHandle`, and /// save or update the `Waker` that can wake with this cancelable future. pub fn register( &self, waker: &Waker, head_rc: Option<RcRef<Node>>, ) -> Result<(), Canceled> { match head_rc.as_ref().map(RcRef::split) { Some((head, rc)) => { // Register this `Cancelable` node with a `CancelHandle` head node. assert_ne!(self, head); // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] let self_inner = unsafe { &mut *self.inner.get() }; // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] let head_inner = unsafe { &mut *head.inner.get() }; self_inner.link(waker, head_inner, rc) } None => { // This `Cancelable` has already been linked to a `CancelHandle` head // node; just update our stored `Waker` if necessary. // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] let inner = unsafe { &mut *self.inner.get() }; inner.update_waker(waker) } } }
pub fn cancel(&self) { // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] let inner = unsafe { &mut *self.inner.get() }; inner.cancel(); }
pub fn is_canceled(&self) -> bool { // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] let inner = unsafe { &mut *self.inner.get() }; inner.is_canceled() } }
impl Default for Node { fn default() -> Self { Self { inner: UnsafeCell::new(NodeInner::Unlinked), _pin: PhantomPinned, } } }
impl Drop for Node { fn drop(&mut self) { // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] let inner = unsafe { &mut *self.inner.get() }; inner.unlink(); } }
impl PartialEq for Node { fn eq(&self, other: &Self) -> bool { std::ptr::eq(self, other) } }
#[derive(Debug)] enum NodeInner { Unlinked, Linked { kind: NodeKind, prev: NonNull<NodeInner>, next: NonNull<NodeInner>, }, Canceled, }
impl NodeInner { fn as_non_null(&mut self) -> NonNull<Self> { NonNull::from(self) }
fn link( &mut self, waker: &Waker, head: &mut Self, rc_pin: &Rc<dyn Any>, ) -> Result<(), Canceled> { // The future should not have been linked to a cancel handle before. assert!(matches!(self, NodeInner::Unlinked));
match head { NodeInner::Unlinked => { *head = NodeInner::Linked { kind: NodeKind::head(rc_pin), prev: self.as_non_null(), next: self.as_non_null(), }; *self = NodeInner::Linked { kind: NodeKind::item(waker), prev: head.as_non_null(), next: head.as_non_null(), }; Ok(()) } NodeInner::Linked { kind: NodeKind::Head { .. }, prev: next_prev_nn, .. } => { // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] let prev = unsafe { &mut *next_prev_nn.as_ptr() }; match prev { NodeInner::Linked { kind: NodeKind::Item { .. }, next: prev_next_nn, .. } => { *self = NodeInner::Linked { kind: NodeKind::item(waker), prev: replace(next_prev_nn, self.as_non_null()), next: replace(prev_next_nn, self.as_non_null()), }; Ok(()) } _ => unreachable!(), } } NodeInner::Canceled => Err(Canceled), _ => unreachable!(), } }
fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> { match self { NodeInner::Unlinked => Ok(()), NodeInner::Linked { kind: NodeKind::Item { waker }, .. } => { if !waker.will_wake(new_waker) { *waker = new_waker.clone(); } Ok(()) } NodeInner::Canceled => Err(Canceled), _ => unreachable!(), } }
/// If this node is linked to other nodes, remove it from the chain. This /// method is called (only) by the drop handler for `Node`. It is suitable /// for both 'head' and 'item' nodes. fn unlink(&mut self) { if let NodeInner::Linked { prev: mut prev_nn, next: mut next_nn, .. } = replace(self, NodeInner::Unlinked) { if prev_nn == next_nn { // There were only two nodes in this chain; after unlinking ourselves // the other node is no longer linked. // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] let other = unsafe { prev_nn.as_mut() }; *other = NodeInner::Unlinked; } else { // The chain had more than two nodes. // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] match unsafe { prev_nn.as_mut() } { NodeInner::Linked { next: prev_next_nn, .. } => { *prev_next_nn = next_nn; } _ => unreachable!(), } // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] match unsafe { next_nn.as_mut() } { NodeInner::Linked { prev: next_prev_nn, .. } => { *next_prev_nn = prev_nn; } _ => unreachable!(), } } } }
/// Mark this node and all linked nodes for cancellation. Note that `self` /// must refer to a head (`CancelHandle`) node. fn cancel(&mut self) { let mut head_nn = NonNull::from(self);
// TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] // Mark the head node as canceled. let mut item_nn = match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) { NodeInner::Linked { kind: NodeKind::Head { .. }, next: next_nn, .. } => next_nn, NodeInner::Unlinked | NodeInner::Canceled => return, _ => unreachable!(), };
// Cancel all item nodes in the chain, waking each stored `Waker`. while item_nn != head_nn { // TODO(piscisaureus): safety comment #[allow(clippy::undocumented_unsafe_blocks)] match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) { NodeInner::Linked { kind: NodeKind::Item { waker }, next: next_nn, .. } => { waker.wake(); item_nn = next_nn; } _ => unreachable!(), } } }
/// Returns true if this node has been marked for cancellation. This method /// may be used with both head (`CancelHandle`) and item (`Cancelable`) /// nodes. fn is_canceled(&self) -> bool { match self { NodeInner::Unlinked | NodeInner::Linked { .. } => false, NodeInner::Canceled => true, } } }
#[derive(Debug)] enum NodeKind { /// In a chain of linked nodes, the "head" node is owned by the /// `CancelHandle`. A chain usually contains at most one head node; however /// when a `CancelHandle` is dropped before the futures associated with it /// are dropped, a chain may temporarily contain no head node at all. Head { /// The `weak_pin` field adds adds a weak reference to the `Rc` guarding /// the heap allocation that contains the `CancelHandle`. Without this /// extra weak reference, `Rc::get_mut()` might succeed and allow the /// `CancelHandle` to be moved when it isn't safe to do so. _weak_pin: Weak<dyn Any>, }, /// All item nodes in a chain are associated with a `Cancelable` head node. Item { /// If this future indeed does get canceled, the waker is needed to make /// sure that the canceled future gets polled as soon as possible. waker: Waker, }, }
impl NodeKind { fn head(rc_pin: &Rc<dyn Any>) -> Self { let _weak_pin = Rc::downgrade(rc_pin); Self::Head { _weak_pin } }
fn item(waker: &Waker) -> Self { let waker = waker.clone(); Self::Item { waker } } }}
#[cfg(test)]mod tests { use super::*; use anyhow::Error; use futures::future::pending; use futures::future::poll_fn; use futures::future::ready; use futures::future::FutureExt; use futures::future::TryFutureExt; use futures::pending; use futures::select; use futures::task::noop_waker_ref; use futures::task::Context; use futures::task::Poll; use std::convert::Infallible as Never; use std::io; use tokio::net::TcpStream; use tokio::spawn; use tokio::task::yield_now;
fn box_fused<'a, F: FusedFuture + 'a>( future: F, ) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> { Box::pin(future) }
async fn ready_in_n(name: &str, count: usize) -> &str { let mut remaining = count as isize; poll_fn(|_| { assert!(remaining >= 0); if remaining == 0 { Poll::Ready(name) } else { remaining -= 1; Poll::Pending } }) .await }
#[test] fn cancel_future() { let cancel_now = CancelHandle::new_rc(); let cancel_at_0 = CancelHandle::new_rc(); let cancel_at_1 = CancelHandle::new_rc(); let cancel_at_4 = CancelHandle::new_rc(); let cancel_never = CancelHandle::new_rc();
cancel_now.cancel();
let mut futures = vec![ box_fused(ready("A").or_cancel(&cancel_now)), box_fused(ready("B").or_cancel(&cancel_at_0)), box_fused(ready("C").or_cancel(&cancel_at_1)), box_fused( ready_in_n("D", 0) .or_cancel(&cancel_never) .try_or_cancel(&cancel_now), ), box_fused( ready_in_n("E", 1) .or_cancel(&cancel_at_1) .try_or_cancel(&cancel_at_1), ), box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)), box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)), box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)), box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)), box_fused(ready_in_n("J", 5).map(Ok)), box_fused(ready_in_n("K", 5).or_cancel(cancel_never)), ];
let mut cx = Context::from_waker(noop_waker_ref());
for i in 0..=5 { match i { 0 => cancel_at_0.cancel(), 1 => cancel_at_1.cancel(), 4 => cancel_at_4.cancel(), 2 | 3 | 5 => {} _ => unreachable!(), }
let results = futures .iter_mut() .filter(|fut| !fut.is_terminated()) .filter_map(|fut| match fut.poll_unpin(&mut cx) { Poll::Pending => None, Poll::Ready(res) => Some(res), }) .collect::<Vec<_>>();
match i { 0 => assert_eq!( results, [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)] ), 1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]), 2 => assert_eq!(results, []), 3 => assert_eq!(results, [Ok("G")]), 4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]), 5 => assert_eq!(results, [Ok("J"), Ok("K")]), _ => unreachable!(), } }
assert!(!futures.into_iter().any(|fut| !fut.is_terminated()));
let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4]; assert!(!cancel_handles.iter().any(|c| !c.is_canceled())); }
#[tokio::test] async fn cancel_try_future() { { // Cancel a spawned task before it actually runs. let cancel_handle = Rc::new(CancelHandle::new()); let future = spawn(async { panic!("the task should not be spawned") }) .map_err(Error::from) .try_or_cancel(&cancel_handle); cancel_handle.cancel(); let error = future.await.unwrap_err(); assert!(error.downcast_ref::<Canceled>().is_some()); assert_eq!(error.to_string().as_str(), "operation canceled"); }
{ // Cancel a network I/O future right after polling it. let cancel_handle = Rc::new(CancelHandle::new()); let result = loop { select! { r = TcpStream::connect("1.2.3.4:12345") .try_or_cancel(&cancel_handle) => break r, default => cancel_handle.cancel(), }; }; let error = result.unwrap_err(); assert_eq!(error.kind(), io::ErrorKind::Interrupted); assert_eq!(error.to_string().as_str(), "operation canceled"); } }
#[tokio::test] async fn future_cancels_itself_before_completion() { // A future cancels itself before it reaches completion. This future should // indeed get canceled and should not be polled again. let cancel_handle = CancelHandle::new_rc(); let result = async { cancel_handle.cancel(); yield_now().await; unreachable!(); } .or_cancel(&cancel_handle) .await; assert_eq!(result.unwrap_err(), Canceled); }
#[tokio::test] async fn future_cancels_itself_and_hangs() { // A future cancels itself, after which it returns `Poll::Pending` without // setting up a waker that would allow it to make progress towards // completion. Nevertheless, the `Cancelable` wrapper future must finish. let cancel_handle = CancelHandle::new_rc(); let result = async { yield_now().await; cancel_handle.cancel(); pending!(); unreachable!(); } .or_cancel(&cancel_handle) .await; assert_eq!(result.unwrap_err(), Canceled); }
#[tokio::test] async fn future_cancels_itself_and_completes() { // A TryFuture attempts to cancel itself while it is getting polled, and // yields a result from the very same `poll()` call. Because this future // actually reaches completion, the attempted cancellation has no effect. let cancel_handle = CancelHandle::new_rc(); let result = async { yield_now().await; cancel_handle.cancel(); Ok::<_, io::Error>("done") } .try_or_cancel(&cancel_handle) .await; assert_eq!(result.unwrap(), "done"); }
#[test] fn cancel_handle_pinning() { let mut cancel_handle = CancelHandle::new_rc();
// There is only one reference to `cancel_handle`, so `Rc::get_mut()` should // succeed. assert!(Rc::get_mut(&mut cancel_handle).is_some());
let mut future = pending::<Never>().or_cancel(&cancel_handle); // SAFETY: `Cancelable` pins the future let future = unsafe { Pin::new_unchecked(&mut future) };
// There are two `Rc<CancelHandle>` references now, so this fails. assert!(Rc::get_mut(&mut cancel_handle).is_none());
let mut cx = Context::from_waker(noop_waker_ref()); assert!(future.poll(&mut cx).is_pending());
// Polling `future` has established a link between the future and // `cancel_handle`, so both values should be pinned at this point. assert!(Rc::get_mut(&mut cancel_handle).is_none());
cancel_handle.cancel();
// Canceling or dropping the associated future(s) unlinks them from the // cancel handle, therefore `cancel_handle` can now safely be moved again. assert!(Rc::get_mut(&mut cancel_handle).is_some()); }}