Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncSeek trait to Reader to be able to seek inside asset loaders #12547

Merged
merged 8 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion crates/bevy_asset/src/io/memory.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::io::{AssetReader, AssetReaderError, PathStream, Reader};
use bevy_utils::{BoxedFuture, HashMap};
use futures_io::AsyncRead;
use futures_io::{AsyncRead, AsyncSeek};
use futures_lite::{ready, Stream};
use parking_lot::RwLock;
use std::io::SeekFrom;
use std::{
path::{Path, PathBuf},
pin::Pin,
Expand Down Expand Up @@ -236,6 +237,31 @@ impl AsyncRead for DataReader {
}
}

impl AsyncSeek for DataReader {
fn poll_seek(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
pos: SeekFrom,
) -> Poll<std::io::Result<u64>> {
let new_pos = match pos {
SeekFrom::Start(offset) => offset as i64,
BeastLe9enD marked this conversation as resolved.
Show resolved Hide resolved
SeekFrom::End(offset) => self.data.value().len() as i64 - offset,
SeekFrom::Current(offset) => self.bytes_read as i64 + offset,
};

if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
}
}

impl AssetReader for MemoryAssetReader {
fn read<'a>(
&'a self,
Expand Down
92 changes: 90 additions & 2 deletions crates/bevy_asset/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ pub use futures_lite::{AsyncReadExt, AsyncWriteExt};
pub use source::*;

use bevy_utils::BoxedFuture;
use futures_io::{AsyncRead, AsyncWrite};
use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use futures_lite::{ready, Stream};
use std::io::SeekFrom;
use std::task::Context;
use std::{
path::{Path, PathBuf},
pin::Pin,
Expand Down Expand Up @@ -55,7 +57,11 @@ impl From<std::io::Error> for AssetReaderError {
}
}

pub type Reader<'a> = dyn AsyncRead + Unpin + Send + Sync + 'a;
pub trait AsyncReadAndSeek: AsyncRead + AsyncSeek {}

impl<T: AsyncRead + AsyncSeek> AsyncReadAndSeek for T {}

pub type Reader<'a> = dyn AsyncReadAndSeek + Unpin + Send + Sync + 'a;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is a breaking change for any third party Reader types (if any exist).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I added a point to the migration guide :)


/// Performs read operations on an asset storage. [`AssetReader`] exposes a "virtual filesystem"
/// API, where asset bytes and asset metadata bytes are both stored and accessible for a given
Expand Down Expand Up @@ -264,6 +270,88 @@ impl AsyncRead for VecReader {
}
}

impl AsyncSeek for VecReader {
fn poll_seek(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<std::io::Result<u64>> {
let new_pos = match pos {
SeekFrom::Start(offset) => offset as i64,
SeekFrom::End(offset) => self.bytes.len() as i64 - offset,
SeekFrom::Current(offset) => self.bytes_read as i64 + offset,
};

if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
}
}

/// An [`AsyncRead`] implementation capable of reading a [`&[u8]`].
pub struct SliceReader<'a> {
bytes: &'a [u8],
bytes_read: usize,
}

impl<'a> SliceReader<'a> {
/// Create a new [`SliceReader`] for `bytes`.
pub fn new(bytes: &'a [u8]) -> Self {
Self {
bytes,
bytes_read: 0,
}
}
}

impl<'a> AsyncRead for SliceReader<'a> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
if self.bytes_read >= self.bytes.len() {
Poll::Ready(Ok(0))
} else {
let n = ready!(Pin::new(&mut &self.bytes[self.bytes_read..]).poll_read(cx, buf))?;
self.bytes_read += n;
Poll::Ready(Ok(n))
}
}
}

impl<'a> AsyncSeek for SliceReader<'a> {
fn poll_seek(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<std::io::Result<u64>> {
let new_pos = match pos {
SeekFrom::Start(offset) => offset as i64,
SeekFrom::End(offset) => self.bytes.len() as i64 - offset,
SeekFrom::Current(offset) => self.bytes_read as i64 + offset,
};

if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
}
}

/// Appends `.meta` to the given path.
pub(crate) fn get_meta_path(path: &Path) -> PathBuf {
let mut meta_path = path.to_path_buf();
Expand Down
14 changes: 13 additions & 1 deletion crates/bevy_asset/src/io/processor_gated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::{
use async_lock::RwLockReadGuardArc;
use bevy_utils::tracing::trace;
use bevy_utils::BoxedFuture;
use futures_io::AsyncRead;
use futures_io::{AsyncRead, AsyncSeek};
use std::io::SeekFrom;
use std::task::Poll;
use std::{path::Path, pin::Pin, sync::Arc};

/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a
Expand Down Expand Up @@ -156,3 +158,13 @@ impl<'a> AsyncRead for TransactionLockedReader<'a> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}

impl<'a> AsyncSeek for TransactionLockedReader<'a> {
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
pos: SeekFrom,
) -> Poll<std::io::Result<u64>> {
Pin::new(&mut self.reader).poll_seek(cx, pos)
}
}
4 changes: 3 additions & 1 deletion crates/bevy_asset/src/processor/process.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::io::SliceReader;
use crate::{
io::{
AssetReaderError, AssetWriterError, MissingAssetWriterError,
Expand Down Expand Up @@ -346,12 +347,13 @@ impl<'a> ProcessContext<'a> {
let server = &self.processor.server;
let loader_name = std::any::type_name::<L>();
let loader = server.get_asset_loader_with_type_name(loader_name).await?;
let mut reader = SliceReader::new(self.asset_bytes);
let loaded_asset = server
.load_with_meta_loader_and_reader(
self.path,
Box::new(meta),
&*loader,
&mut self.asset_bytes,
&mut reader,
false,
true,
)
Expand Down