Path: blob/main/crates/bevy_asset/src/io/processor_gated.rs
9418 views
use crate::{1io::{2AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader, ReaderNotSeekableError,3SeekableReader,4},5processor::{ProcessStatus, ProcessingState},6AssetPath,7};8use alloc::{borrow::ToOwned, boxed::Box, sync::Arc, vec::Vec};9use async_lock::RwLockReadGuardArc;10use core::{pin::Pin, task::Poll};11use futures_io::AsyncRead;12use std::path::Path;13use tracing::trace;1415use super::ErasedAssetReader;1617/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a18/// given path until that path has been processed by [`AssetProcessor`].19///20/// [`AssetProcessor`]: crate::processor::AssetProcessor21pub(crate) struct ProcessorGatedReader {22reader: Arc<dyn ErasedAssetReader>,23source: AssetSourceId<'static>,24processing_state: Arc<ProcessingState>,25}2627impl ProcessorGatedReader {28/// Creates a new [`ProcessorGatedReader`].29pub(crate) fn new(30source: AssetSourceId<'static>,31reader: Arc<dyn ErasedAssetReader>,32processing_state: Arc<ProcessingState>,33) -> Self {34Self {35source,36reader,37processing_state,38}39}40}4142impl AssetReader for ProcessorGatedReader {43async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {44let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());45trace!("Waiting for processing to finish before reading {asset_path}");46let process_result = self47.processing_state48.wait_until_processed(asset_path.clone())49.await;50match process_result {51ProcessStatus::Processed => {}52ProcessStatus::Failed | ProcessStatus::NonExistent => {53return Err(AssetReaderError::NotFound(path.to_owned()));54}55}56trace!("Processing finished with {asset_path}, reading {process_result:?}",);57let lock = self58.processing_state59.get_transaction_lock(&asset_path)60.await?;61let asset_reader = self.reader.read(path).await?;62let reader = TransactionLockedReader::new(asset_reader, lock);63Ok(reader)64}6566async fn read_meta<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {67let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());68trace!("Waiting for processing to finish before reading meta for {asset_path}",);69let process_result = self70.processing_state71.wait_until_processed(asset_path.clone())72.await;73match process_result {74ProcessStatus::Processed => {}75ProcessStatus::Failed | ProcessStatus::NonExistent => {76return Err(AssetReaderError::NotFound(path.to_owned()));77}78}79trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);80let lock = self81.processing_state82.get_transaction_lock(&asset_path)83.await?;84let meta_reader = self.reader.read_meta(path).await?;85let reader = TransactionLockedReader::new(meta_reader, lock);86Ok(reader)87}8889async fn read_directory<'a>(90&'a self,91path: &'a Path,92) -> Result<Box<PathStream>, AssetReaderError> {93trace!(94"Waiting for processing to finish before reading directory {:?}",95path96);97self.processing_state.wait_until_finished().await;98trace!("Processing finished, reading directory {:?}", path);99let result = self.reader.read_directory(path).await?;100Ok(result)101}102103async fn is_directory<'a>(&'a self, path: &'a Path) -> Result<bool, AssetReaderError> {104trace!(105"Waiting for processing to finish before reading directory {:?}",106path107);108self.processing_state.wait_until_finished().await;109trace!("Processing finished, getting directory status {:?}", path);110let result = self.reader.is_directory(path).await?;111Ok(result)112}113}114115/// An [`AsyncRead`] impl that will hold its asset's transaction lock until [`TransactionLockedReader`] is dropped.116pub struct TransactionLockedReader<'a> {117reader: Box<dyn Reader + 'a>,118_file_transaction_lock: RwLockReadGuardArc<()>,119}120121impl<'a> TransactionLockedReader<'a> {122fn new(reader: Box<dyn Reader + 'a>, file_transaction_lock: RwLockReadGuardArc<()>) -> Self {123Self {124reader,125_file_transaction_lock: file_transaction_lock,126}127}128}129130impl AsyncRead for TransactionLockedReader<'_> {131fn poll_read(132mut self: Pin<&mut Self>,133cx: &mut core::task::Context<'_>,134buf: &mut [u8],135) -> Poll<futures_io::Result<usize>> {136Pin::new(&mut self.reader).poll_read(cx, buf)137}138}139140impl Reader for TransactionLockedReader<'_> {141fn read_to_end<'a>(142&'a mut self,143buf: &'a mut Vec<u8>,144) -> stackfuture::StackFuture<'a, std::io::Result<usize>, { super::STACK_FUTURE_SIZE }> {145self.reader.read_to_end(buf)146}147148fn seekable(&mut self) -> Result<&mut dyn SeekableReader, ReaderNotSeekableError> {149self.reader.seekable()150}151}152153154