From fc1623632d34ccd8a15d65b2bf47cb3cfe1e5b84 Mon Sep 17 00:00:00 2001 From: BeastLe9enD Date: Sun, 17 Mar 2024 23:42:30 +0100 Subject: [PATCH 1/4] Add support for async seek --- crates/bevy_asset/src/io/memory.rs | 28 ++++++- crates/bevy_asset/src/io/mod.rs | 92 ++++++++++++++++++++- crates/bevy_asset/src/io/processor_gated.rs | 14 +++- crates/bevy_asset/src/processor/process.rs | 4 +- 4 files changed, 133 insertions(+), 5 deletions(-) diff --git a/crates/bevy_asset/src/io/memory.rs b/crates/bevy_asset/src/io/memory.rs index cc13d0482056a..db3926661d2d4 100644 --- a/crates/bevy_asset/src/io/memory.rs +++ b/crates/bevy_asset/src/io/memory.rs @@ -1,8 +1,9 @@ use crate::io::{AssetReader, AssetReaderError, PathStream, Reader}; use bevy_utils::{BoxedFuture, HashMap}; -use futures_io::AsyncRead; +use futures_io::{AsyncRead, AsyncSeek}; use futures_lite::{ready, Stream}; use parking_lot::RwLock; +use std::io::SeekFrom; use std::{ path::{Path, PathBuf}, pin::Pin, @@ -236,6 +237,31 @@ impl AsyncRead for DataReader { } } +impl AsyncSeek for DataReader { + fn poll_seek( + mut self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + pos: SeekFrom, + ) -> Poll> { + let new_pos = match pos { + SeekFrom::Start(offset) => offset as i64, + SeekFrom::End(offset) => self.data.value().len() as i64 - offset, + SeekFrom::Current(offset) => self.bytes_read as i64 + offset, + }; + + if new_pos < 0 { + Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seek position is out of range", + ))) + } else { + self.bytes_read = new_pos as _; + + Poll::Ready(Ok(new_pos as _)) + } + } +} + impl AssetReader for MemoryAssetReader { fn read<'a>( &'a self, diff --git a/crates/bevy_asset/src/io/mod.rs b/crates/bevy_asset/src/io/mod.rs index 9b8f83b0eea7f..31d0e031d7d7b 100644 --- a/crates/bevy_asset/src/io/mod.rs +++ b/crates/bevy_asset/src/io/mod.rs @@ -22,8 +22,10 @@ pub use futures_lite::{AsyncReadExt, AsyncWriteExt}; pub use source::*; use bevy_utils::BoxedFuture; -use futures_io::{AsyncRead, AsyncWrite}; +use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; use futures_lite::{ready, Stream}; +use std::io::SeekFrom; +use std::task::Context; use std::{ path::{Path, PathBuf}, pin::Pin, @@ -55,7 +57,11 @@ impl From for AssetReaderError { } } -pub type Reader<'a> = dyn AsyncRead + Unpin + Send + Sync + 'a; +pub trait AsyncReadAndSeek: AsyncRead + AsyncSeek {} + +impl AsyncReadAndSeek for T {} + +pub type Reader<'a> = dyn AsyncReadAndSeek + Unpin + Send + Sync + 'a; /// Performs read operations on an asset storage. [`AssetReader`] exposes a "virtual filesystem" /// API, where asset bytes and asset metadata bytes are both stored and accessible for a given @@ -264,6 +270,88 @@ impl AsyncRead for VecReader { } } +impl AsyncSeek for VecReader { + fn poll_seek( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + let new_pos = match pos { + SeekFrom::Start(offset) => offset as i64, + SeekFrom::End(offset) => self.bytes.len() as i64 - offset, + SeekFrom::Current(offset) => self.bytes_read as i64 + offset, + }; + + if new_pos < 0 { + Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seek position is out of range", + ))) + } else { + self.bytes_read = new_pos as _; + + Poll::Ready(Ok(new_pos as _)) + } + } +} + +/// An [`AsyncRead`] implementation capable of reading a [`&[u8]`]. +pub struct SliceReader<'a> { + bytes: &'a [u8], + bytes_read: usize, +} + +impl<'a> SliceReader<'a> { + /// Create a new [`SliceReader`] for `bytes`. + pub fn new(bytes: &'a [u8]) -> Self { + Self { + bytes, + bytes_read: 0, + } + } +} + +impl<'a> AsyncRead for SliceReader<'a> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if self.bytes_read >= self.bytes.len() { + Poll::Ready(Ok(0)) + } else { + let n = ready!(Pin::new(&mut &self.bytes[self.bytes_read..]).poll_read(cx, buf))?; + self.bytes_read += n; + Poll::Ready(Ok(n)) + } + } +} + +impl<'a> AsyncSeek for SliceReader<'a> { + fn poll_seek( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + let new_pos = match pos { + SeekFrom::Start(offset) => offset as i64, + SeekFrom::End(offset) => self.bytes.len() as i64 - offset, + SeekFrom::Current(offset) => self.bytes_read as i64 + offset, + }; + + if new_pos < 0 { + Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seek position is out of range", + ))) + } else { + self.bytes_read = new_pos as _; + + Poll::Ready(Ok(new_pos as _)) + } + } +} + /// Appends `.meta` to the given path. pub(crate) fn get_meta_path(path: &Path) -> PathBuf { let mut meta_path = path.to_path_buf(); diff --git a/crates/bevy_asset/src/io/processor_gated.rs b/crates/bevy_asset/src/io/processor_gated.rs index b86460f04b6fd..f99678713b916 100644 --- a/crates/bevy_asset/src/io/processor_gated.rs +++ b/crates/bevy_asset/src/io/processor_gated.rs @@ -6,7 +6,9 @@ use crate::{ use async_lock::RwLockReadGuardArc; use bevy_utils::tracing::trace; use bevy_utils::BoxedFuture; -use futures_io::AsyncRead; +use futures_io::{AsyncRead, AsyncSeek}; +use std::io::SeekFrom; +use std::task::Poll; use std::{path::Path, pin::Pin, sync::Arc}; /// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a @@ -156,3 +158,13 @@ impl<'a> AsyncRead for TransactionLockedReader<'a> { Pin::new(&mut self.reader).poll_read(cx, buf) } } + +impl<'a> AsyncSeek for TransactionLockedReader<'a> { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + pos: SeekFrom, + ) -> Poll> { + Pin::new(&mut self.reader).poll_seek(cx, pos) + } +} diff --git a/crates/bevy_asset/src/processor/process.rs b/crates/bevy_asset/src/processor/process.rs index 75b10acfa2640..ec3bf9cff6dfe 100644 --- a/crates/bevy_asset/src/processor/process.rs +++ b/crates/bevy_asset/src/processor/process.rs @@ -1,3 +1,4 @@ +use crate::io::SliceReader; use crate::{ io::{ AssetReaderError, AssetWriterError, MissingAssetWriterError, @@ -346,12 +347,13 @@ impl<'a> ProcessContext<'a> { let server = &self.processor.server; let loader_name = std::any::type_name::(); let loader = server.get_asset_loader_with_type_name(loader_name).await?; + let mut reader = SliceReader::new(self.asset_bytes); let loaded_asset = server .load_with_meta_loader_and_reader( self.path, Box::new(meta), &*loader, - &mut self.asset_bytes, + &mut reader, false, true, ) From 45e97b78c05359cea6e76599567956d8157f2efb Mon Sep 17 00:00:00 2001 From: BeastLe9enD Date: Tue, 19 Mar 2024 11:18:55 +0100 Subject: [PATCH 2/4] Replace as calls with try_into --- crates/bevy_asset/src/io/memory.rs | 33 +++++++++++++----- crates/bevy_asset/src/io/mod.rs | 56 ++++++++++++++++++++---------- 2 files changed, 62 insertions(+), 27 deletions(-) diff --git a/crates/bevy_asset/src/io/memory.rs b/crates/bevy_asset/src/io/memory.rs index db3926661d2d4..1607290fe8223 100644 --- a/crates/bevy_asset/src/io/memory.rs +++ b/crates/bevy_asset/src/io/memory.rs @@ -243,21 +243,36 @@ impl AsyncSeek for DataReader { _cx: &mut std::task::Context<'_>, pos: SeekFrom, ) -> Poll> { - let new_pos = match pos { - SeekFrom::Start(offset) => offset as i64, - SeekFrom::End(offset) => self.data.value().len() as i64 - offset, - SeekFrom::Current(offset) => self.bytes_read as i64 + offset, + let result = match pos { + SeekFrom::Start(offset) => offset.try_into(), + SeekFrom::End(offset) => self + .data + .value() + .len() + .try_into() + .map(|len: i64| len - offset), + SeekFrom::Current(offset) => self + .bytes_read + .try_into() + .map(|bytes_read: i64| bytes_read + offset), }; - if new_pos < 0 { + if let Ok(new_pos) = result { + if new_pos < 0 { + Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seek position is out of range", + ))) + } else { + self.bytes_read = new_pos as _; + + Poll::Ready(Ok(new_pos as _)) + } + } else { Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, "seek position is out of range", ))) - } else { - self.bytes_read = new_pos as _; - - Poll::Ready(Ok(new_pos as _)) } } } diff --git a/crates/bevy_asset/src/io/mod.rs b/crates/bevy_asset/src/io/mod.rs index 31d0e031d7d7b..34eb532d5e5f6 100644 --- a/crates/bevy_asset/src/io/mod.rs +++ b/crates/bevy_asset/src/io/mod.rs @@ -276,21 +276,31 @@ impl AsyncSeek for VecReader { _cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll> { - let new_pos = match pos { - SeekFrom::Start(offset) => offset as i64, - SeekFrom::End(offset) => self.bytes.len() as i64 - offset, - SeekFrom::Current(offset) => self.bytes_read as i64 + offset, + let result = match pos { + SeekFrom::Start(offset) => offset.try_into(), + SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset), + SeekFrom::Current(offset) => self + .bytes_read + .try_into() + .map(|bytes_read: i64| bytes_read + offset), }; - if new_pos < 0 { + if let Ok(new_pos) = result { + if new_pos < 0 { + Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seek position is out of range", + ))) + } else { + self.bytes_read = new_pos as _; + + Poll::Ready(Ok(new_pos as _)) + } + } else { Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, "seek position is out of range", ))) - } else { - self.bytes_read = new_pos as _; - - Poll::Ready(Ok(new_pos as _)) } } } @@ -333,21 +343,31 @@ impl<'a> AsyncSeek for SliceReader<'a> { _cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll> { - let new_pos = match pos { - SeekFrom::Start(offset) => offset as i64, - SeekFrom::End(offset) => self.bytes.len() as i64 - offset, - SeekFrom::Current(offset) => self.bytes_read as i64 + offset, + let result = match pos { + SeekFrom::Start(offset) => offset.try_into(), + SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset), + SeekFrom::Current(offset) => self + .bytes_read + .try_into() + .map(|bytes_read: i64| bytes_read + offset), }; - if new_pos < 0 { + if let Ok(new_pos) = result { + if new_pos < 0 { + Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seek position is out of range", + ))) + } else { + self.bytes_read = new_pos as _; + + Poll::Ready(Ok(new_pos as _)) + } + } else { Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, "seek position is out of range", ))) - } else { - self.bytes_read = new_pos as _; - - Poll::Ready(Ok(new_pos as _)) } } } From a206876acbab8749e768c4fb68d601bf226ed544 Mon Sep 17 00:00:00 2001 From: BeastLe9enD Date: Fri, 29 Mar 2024 22:21:21 +0100 Subject: [PATCH 3/4] Add AsyncSeek impl for FileReader --- crates/bevy_asset/src/io/file/sync_file_asset.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/bevy_asset/src/io/file/sync_file_asset.rs b/crates/bevy_asset/src/io/file/sync_file_asset.rs index 426472150167e..9cef053e5fd8c 100644 --- a/crates/bevy_asset/src/io/file/sync_file_asset.rs +++ b/crates/bevy_asset/src/io/file/sync_file_asset.rs @@ -1,4 +1,4 @@ -use futures_io::{AsyncRead, AsyncWrite}; +use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; use futures_lite::Stream; use crate::io::{ @@ -30,6 +30,18 @@ impl AsyncRead for FileReader { } } +impl AsyncSeek for FileReader { + fn poll_seek( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + pos: std::io::SeekFrom, + ) -> Poll> { + let this = self.get_mut(); + let seek = this.0.seek(pos); + Poll::Ready(seek) + } +} + struct FileWriter(File); impl AsyncWrite for FileWriter { From fc79bf153590f227401bfd59cef1a8ca841d8f93 Mon Sep 17 00:00:00 2001 From: BeastLe9enD Date: Fri, 29 Mar 2024 22:26:37 +0100 Subject: [PATCH 4/4] fix --- crates/bevy_asset/src/io/file/sync_file_asset.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_asset/src/io/file/sync_file_asset.rs b/crates/bevy_asset/src/io/file/sync_file_asset.rs index 9cef053e5fd8c..83eacd5649bd4 100644 --- a/crates/bevy_asset/src/io/file/sync_file_asset.rs +++ b/crates/bevy_asset/src/io/file/sync_file_asset.rs @@ -8,7 +8,7 @@ use crate::io::{ use std::{ fs::{read_dir, File}, - io::{Read, Write}, + io::{Read, Seek, Write}, path::{Path, PathBuf}, pin::Pin, task::Poll,