Path: blob/main/crates/bevy_asset/src/io/processor_gated.rs
6600 views
use crate::{1io::{AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader},2processor::{AssetProcessorData, ProcessStatus},3AssetPath,4};5use alloc::{borrow::ToOwned, boxed::Box, sync::Arc, vec::Vec};6use async_lock::RwLockReadGuardArc;7use core::{pin::Pin, task::Poll};8use futures_io::AsyncRead;9use std::path::Path;10use tracing::trace;1112use super::{AsyncSeekForward, ErasedAssetReader};1314/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a15/// given path until that path has been processed by [`AssetProcessor`].16///17/// [`AssetProcessor`]: crate::processor::AssetProcessor18pub struct ProcessorGatedReader {19reader: Box<dyn ErasedAssetReader>,20source: AssetSourceId<'static>,21processor_data: Arc<AssetProcessorData>,22}2324impl ProcessorGatedReader {25/// Creates a new [`ProcessorGatedReader`].26pub fn new(27source: AssetSourceId<'static>,28reader: Box<dyn ErasedAssetReader>,29processor_data: Arc<AssetProcessorData>,30) -> Self {31Self {32source,33processor_data,34reader,35}36}3738/// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur39/// while it is held.40async fn get_transaction_lock(41&self,42path: &AssetPath<'static>,43) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {44let infos = self.processor_data.asset_infos.read().await;45let info = infos46.get(path)47.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;48Ok(info.file_transaction_lock.read_arc().await)49}50}5152impl AssetReader for ProcessorGatedReader {53async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {54let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());55trace!("Waiting for processing to finish before reading {asset_path}");56let process_result = self57.processor_data58.wait_until_processed(asset_path.clone())59.await;60match process_result {61ProcessStatus::Processed => {}62ProcessStatus::Failed | ProcessStatus::NonExistent => {63return Err(AssetReaderError::NotFound(path.to_owned()));64}65}66trace!("Processing finished with {asset_path}, reading {process_result:?}",);67let lock = self.get_transaction_lock(&asset_path).await?;68let asset_reader = self.reader.read(path).await?;69let reader = TransactionLockedReader::new(asset_reader, lock);70Ok(reader)71}7273async fn read_meta<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {74let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());75trace!("Waiting for processing to finish before reading meta for {asset_path}",);76let process_result = self77.processor_data78.wait_until_processed(asset_path.clone())79.await;80match process_result {81ProcessStatus::Processed => {}82ProcessStatus::Failed | ProcessStatus::NonExistent => {83return Err(AssetReaderError::NotFound(path.to_owned()));84}85}86trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);87let lock = self.get_transaction_lock(&asset_path).await?;88let meta_reader = self.reader.read_meta(path).await?;89let reader = TransactionLockedReader::new(meta_reader, lock);90Ok(reader)91}9293async fn read_directory<'a>(94&'a self,95path: &'a Path,96) -> Result<Box<PathStream>, AssetReaderError> {97trace!(98"Waiting for processing to finish before reading directory {:?}",99path100);101self.processor_data.wait_until_finished().await;102trace!("Processing finished, reading directory {:?}", path);103let result = self.reader.read_directory(path).await?;104Ok(result)105}106107async fn is_directory<'a>(&'a self, path: &'a Path) -> Result<bool, AssetReaderError> {108trace!(109"Waiting for processing to finish before reading directory {:?}",110path111);112self.processor_data.wait_until_finished().await;113trace!("Processing finished, getting directory status {:?}", path);114let result = self.reader.is_directory(path).await?;115Ok(result)116}117}118119/// An [`AsyncRead`] impl that will hold its asset's transaction lock until [`TransactionLockedReader`] is dropped.120pub struct TransactionLockedReader<'a> {121reader: Box<dyn Reader + 'a>,122_file_transaction_lock: RwLockReadGuardArc<()>,123}124125impl<'a> TransactionLockedReader<'a> {126fn new(reader: Box<dyn Reader + 'a>, file_transaction_lock: RwLockReadGuardArc<()>) -> Self {127Self {128reader,129_file_transaction_lock: file_transaction_lock,130}131}132}133134impl AsyncRead for TransactionLockedReader<'_> {135fn poll_read(136mut self: Pin<&mut Self>,137cx: &mut core::task::Context<'_>,138buf: &mut [u8],139) -> Poll<futures_io::Result<usize>> {140Pin::new(&mut self.reader).poll_read(cx, buf)141}142}143144impl AsyncSeekForward for TransactionLockedReader<'_> {145fn poll_seek_forward(146mut self: Pin<&mut Self>,147cx: &mut core::task::Context<'_>,148offset: u64,149) -> Poll<std::io::Result<u64>> {150Pin::new(&mut self.reader).poll_seek_forward(cx, offset)151}152}153154impl Reader for TransactionLockedReader<'_> {155fn read_to_end<'a>(156&'a mut self,157buf: &'a mut Vec<u8>,158) -> stackfuture::StackFuture<'a, std::io::Result<usize>, { super::STACK_FUTURE_SIZE }> {159self.reader.read_to_end(buf)160}161}162163164