Skip to content

Commit

Permalink
Improve async cancellation safety of future::Cache
Browse files Browse the repository at this point in the history
Reschedule write op after async cancel.
  • Loading branch information
tatsuya6502 committed Aug 26, 2023
1 parent 4b85133 commit 4194759
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 137 deletions.
2 changes: 1 addition & 1 deletion MIGRATION-GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ let cache = Cache::builder()

`async_eviction_listener` takes a closure that returns a `Future`. If you need to
`.await` something in the eviction listener, use this method. The actual return type
of the closure is `future::ListenerFuture`, which is a type alias of
of the closure is `notification::ListenerFuture`, which is a type alias of
`Pin<Box<dyn Future<Output = ()> + Send>>`. You can use the `boxed` method of
`future::FutureExt` trait to convert a regular `Future` into this type.

Expand Down
63 changes: 48 additions & 15 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_lock::Mutex;
use crossbeam_channel::Sender;
use futures_util::future::{BoxFuture, Shared};
use once_cell::sync::Lazy;
use std::{future::Future, hash::Hash, sync::Arc};
use std::{fmt, future::Future, hash::Hash, sync::Arc};
use triomphe::Arc as TrioArc;

use crate::common::{
Expand Down Expand Up @@ -82,47 +82,71 @@ pub(crate) enum PendingOp<K, V> {
// 'static means that the future can capture only owned value and/or static
// references. No non-static references are allowed.
CallEvictionListener {
ts: Instant,
future: Shared<BoxFuture<'static, ()>>,
op: WriteOp<K, V>,
op: TrioArc<WriteOp<K, V>>,
},
SendWriteOp {
ts: Instant,
op: TrioArc<WriteOp<K, V>>,
},
#[allow(unused)]
SendWriteOp(WriteOp<K, V>),
}

struct PendingOpGuard<'a, K, V> {
pending_op_ch: &'a Sender<PendingOp<K, V>>,
ts: Instant,
future: Option<Shared<BoxFuture<'static, ()>>>,
op: Option<WriteOp<K, V>>,
op: Option<TrioArc<WriteOp<K, V>>>,
}

impl<'a, K, V> PendingOpGuard<'a, K, V> {
fn new(pending_op_ch: &'a Sender<PendingOp<K, V>>) -> Self {
fn new(pending_op_ch: &'a Sender<PendingOp<K, V>>, ts: Instant) -> Self {
Self {
pending_op_ch,
ts,
future: Default::default(),
op: Default::default(),
}
}

fn set(&mut self, future: Shared<BoxFuture<'static, ()>>, op: WriteOp<K, V>) {
fn set_future_and_op(
&mut self,
future: Shared<BoxFuture<'static, ()>>,
op: TrioArc<WriteOp<K, V>>,
) {
self.future = Some(future);
self.op = Some(op);
}

fn clear(&mut self) -> Option<WriteOp<K, V>> {
fn set_op(&mut self, op: TrioArc<WriteOp<K, V>>) {
self.op = Some(op);
}

fn unset_future(&mut self) {
self.future = None;
self.op.take()
}

fn clear(&mut self) {
self.future = None;
self.op = None;
}
}

impl<'a, K, V> Drop for PendingOpGuard<'a, K, V> {
fn drop(&mut self) {
if let (Some(future), Some(op)) = (self.future.take(), self.op.take()) {
let pending_op = PendingOp::CallEvictionListener { future, op };
self.pending_op_ch
.send(pending_op)
.expect("Failed to send a pending op");
}
let pending_op = match (self.future.take(), self.op.take()) {
(Some(future), Some(op)) => PendingOp::CallEvictionListener {
ts: self.ts,
future,
op,
},
(None, Some(op)) => PendingOp::SendWriteOp { ts: self.ts, op },
_ => return,
};

self.pending_op_ch
.send(pending_op)
.expect("Failed to send a pending op");
}
}

Expand Down Expand Up @@ -156,3 +180,12 @@ pub(crate) enum WriteOp<K, V> {
},
Remove(TrioArc<KvEntry<K, V>>),
}

impl<K, V> fmt::Debug for WriteOp<K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Upsert { .. } => f.debug_struct("Upsert").finish(),
Self::Remove(..) => f.debug_tuple("Remove").finish(),
}
}
}
Loading

0 comments on commit 4194759

Please sign in to comment.