Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lru): correctly handle future cancellation #3911

Merged
merged 6 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions src/common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,9 @@ impl<K: LruKey, T: LruValue> LruCache<K, T> {
}
}

impl<K: LruKey + Clone, T: LruValue> LruCache<K, T> {
/// Only implement `lookup_with_request_dedup` for static values, as they can be sent across tokio
/// spawned futures.
impl<K: LruKey + Clone + 'static, T: LruValue + 'static> LruCache<K, T> {
pub async fn lookup_with_request_dedup<F, E, VC>(
self: &Arc<Self>,
hash: u64,
Expand All @@ -766,25 +768,33 @@ impl<K: LruKey + Clone, T: LruValue> LruCache<K, T> {
) -> Result<Result<CachableEntry<K, T>, E>, RecvError>
where
F: FnOnce() -> VC,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems async closure is unnecessary? We only need a future here. cc @Little-Wallace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary. There's some variables that only need to be computed when cache miss. See other parts of the PR for more information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. we need to call self.meta_path to compute the meta path ONLY when cache miss. But &self is not 'static, so we can only compute it in this FnOnce closure and outside the Future + 'static.

E: Error,
VC: Future<Output = Result<(T, usize), E>>,
E: Error + Send + 'static,
VC: Future<Output = Result<(T, usize), E>> + Send + 'static,
{
match self.lookup_for_request(hash, key.clone()) {
LookupResult::Cached(entry) => Ok(Ok(entry)),
LookupResult::WaitPendingRequest(recv) => {
let entry = recv.await?;
Ok(Ok(entry))
}
LookupResult::Miss => match fetch_value().await {
Ok((value, charge)) => {
let entry = self.insert(key, hash, charge, value);
Ok(Ok(entry))
}
Err(e) => {
self.clear_pending_request(&key, hash);
Ok(Err(e))
}
},
LookupResult::Miss => {
let this = self.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we rewrite it as

let ret = tokio::spawn(fetch_value).await;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. fetch_value itself is not 'static. The future it returns is 'static.

let fetch_value = fetch_value();
tokio::spawn(async move {
match fetch_value.await {
Ok((value, charge)) => {
let entry = this.insert(key, hash, charge, value);
Ok(Ok(entry))
}
Err(e) => {
this.clear_pending_request(&key, hash);
Ok(Err(e))
}
}
})
.await
.unwrap()
}
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions src/object_store/src/object/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,25 @@ impl DiskObjectStore {
path.hash(&mut hasher);
hasher.finish()
};
let path_when_err = path.clone();
let entry = self
.opened_read_file_cache
.lookup_with_request_dedup::<_, ObjectError, _>(hash, path.clone(), || async {
let file = utils::open_file(&path, true, false, false)
.await?
.into_std()
.await;
Ok((file, 1))
})
.lookup_with_request_dedup::<_, ObjectError, _>(
hash,
path.clone(),
move || async move {
let file = utils::open_file(&path, true, false, false)
.await?
.into_std()
.await;
Ok((file, 1))
},
)
.await
.map_err(|e| {
ObjectError::internal(format!(
"open file cache request dedup get canceled {:?}. Err{:?}",
path.to_str(),
path_when_err.to_str(),
e
))
})??;
Expand Down
16 changes: 10 additions & 6 deletions src/storage/src/hummock/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,27 @@ impl BlockCache {
))
}

pub async fn get_or_insert_with<F>(
pub async fn get_or_insert_with<F, Fut>(
&self,
sst_id: HummockSSTableId,
block_idx: u64,
f: F,
) -> HummockResult<BlockHolder>
where
F: Future<Output = HummockResult<Box<Block>>>,
F: FnOnce() -> Fut,
Fut: Future<Output = HummockResult<Box<Block>>> + Send + 'static,
{
let h = Self::hash(sst_id, block_idx);
let key = (sst_id, block_idx);
let entry = self
.inner
.lookup_with_request_dedup::<_, HummockError, _>(h, key, || async {
let block = f.await?;
let len = block.len();
Ok((block, len))
.lookup_with_request_dedup::<_, HummockError, _>(h, key, || {
let f = f();
async move {
let block = f.await?;
let len = block.len();
Ok((block, len))
}
})
.await
.map_err(|e| {
Expand Down
79 changes: 43 additions & 36 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,25 +146,29 @@ impl SstableStore {
stats: &mut StoreLocalStatistic,
) -> HummockResult<BlockHolder> {
stats.cache_data_block_total += 1;
let fetch_block = async {
let mut fetch_block = || {
stats.cache_data_block_miss += 1;
let block_meta = sst
.meta
.block_metas
.get(block_index as usize)
.ok_or_else(HummockError::invalid_block)?;
.ok_or_else(HummockError::invalid_block)
.unwrap(); // FIXME: don't unwrap here.
let block_loc = BlockLocation {
offset: block_meta.offset as usize,
size: block_meta.len as usize,
};
let data_path = self.get_sst_data_path(sst.id);
let block_data = self
.store
.read(&data_path, Some(block_loc))
.await
.map_err(HummockError::object_io_error)?;
let block = Block::decode(block_data)?;
Ok(Box::new(block))
let store = self.store.clone();

async move {
let block_data = store
.read(&data_path, Some(block_loc))
.await
.map_err(HummockError::object_io_error)?;
let block = Block::decode(block_data)?;
Ok(Box::new(block))
}
};

let disable_cache: fn() -> bool = || {
Expand All @@ -186,9 +190,9 @@ impl SstableStore {
}
CachePolicy::NotFill => match self.block_cache.get(sst.id, block_index) {
Some(block) => Ok(block),
None => fetch_block.await.map(BlockHolder::from_owned_block),
None => fetch_block().await.map(BlockHolder::from_owned_block),
},
CachePolicy::Disable => fetch_block.await.map(BlockHolder::from_owned_block),
CachePolicy::Disable => fetch_block().await.map(BlockHolder::from_owned_block),
}
}

Expand Down Expand Up @@ -236,34 +240,37 @@ impl SstableStore {
stats.cache_meta_block_total += 1;
let entry = self
.meta_cache
.lookup_with_request_dedup::<_, HummockError, _>(sst_id, sst_id, || async {
.lookup_with_request_dedup::<_, HummockError, _>(sst_id, sst_id, || {
let store = self.store.clone();
let meta_path = self.get_sst_meta_path(sst_id);
let data_path = self.get_sst_data_path(sst_id);
stats.cache_meta_block_miss += 1;
let meta = match meta_data {
Some(data) => data,
None => {
let path = self.get_sst_meta_path(sst_id);
let buf = self
.store
.read(&path, None)

async move {
let meta = match meta_data {
Some(data) => data,
None => {
let buf = store
.read(&meta_path, None)
.await
.map_err(HummockError::object_io_error)?;
SstableMeta::decode(&mut &buf[..])?
}
};
let mut size = meta.encoded_size();
let sst = if load_data {
size = meta.estimated_size as usize;

let block_data = store
.read(&data_path, None)
.await
.map_err(HummockError::object_io_error)?;
SstableMeta::decode(&mut &buf[..])?
}
};
let mut size = meta.encoded_size();
let sst = if load_data {
size = meta.estimated_size as usize;
let data_path = self.get_sst_data_path(sst_id);
let block_data = self
.store
.read(&data_path, None)
.await
.map_err(HummockError::object_io_error)?;
Sstable::new_with_data(sst_id, meta, block_data)?
} else {
Sstable::new(sst_id, meta)
};
Ok((Box::new(sst), size))
Sstable::new_with_data(sst_id, meta, block_data)?
} else {
Sstable::new(sst_id, meta)
};
Ok((Box::new(sst), size))
}
})
.await
.map_err(|e| {
Expand Down