Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_asset/src/io/processor_gated.rs
9418 views
1
use crate::{
2
io::{
3
AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader, ReaderNotSeekableError,
4
SeekableReader,
5
},
6
processor::{ProcessStatus, ProcessingState},
7
AssetPath,
8
};
9
use alloc::{borrow::ToOwned, boxed::Box, sync::Arc, vec::Vec};
10
use async_lock::RwLockReadGuardArc;
11
use core::{pin::Pin, task::Poll};
12
use futures_io::AsyncRead;
13
use std::path::Path;
14
use tracing::trace;
15
16
use super::ErasedAssetReader;
17
18
/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a
19
/// given path until that path has been processed by [`AssetProcessor`].
20
///
21
/// [`AssetProcessor`]: crate::processor::AssetProcessor
22
pub(crate) struct ProcessorGatedReader {
23
reader: Arc<dyn ErasedAssetReader>,
24
source: AssetSourceId<'static>,
25
processing_state: Arc<ProcessingState>,
26
}
27
28
impl ProcessorGatedReader {
29
/// Creates a new [`ProcessorGatedReader`].
30
pub(crate) fn new(
31
source: AssetSourceId<'static>,
32
reader: Arc<dyn ErasedAssetReader>,
33
processing_state: Arc<ProcessingState>,
34
) -> Self {
35
Self {
36
source,
37
reader,
38
processing_state,
39
}
40
}
41
}
42
43
impl AssetReader for ProcessorGatedReader {
44
async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
45
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
46
trace!("Waiting for processing to finish before reading {asset_path}");
47
let process_result = self
48
.processing_state
49
.wait_until_processed(asset_path.clone())
50
.await;
51
match process_result {
52
ProcessStatus::Processed => {}
53
ProcessStatus::Failed | ProcessStatus::NonExistent => {
54
return Err(AssetReaderError::NotFound(path.to_owned()));
55
}
56
}
57
trace!("Processing finished with {asset_path}, reading {process_result:?}",);
58
let lock = self
59
.processing_state
60
.get_transaction_lock(&asset_path)
61
.await?;
62
let asset_reader = self.reader.read(path).await?;
63
let reader = TransactionLockedReader::new(asset_reader, lock);
64
Ok(reader)
65
}
66
67
async fn read_meta<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
68
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
69
trace!("Waiting for processing to finish before reading meta for {asset_path}",);
70
let process_result = self
71
.processing_state
72
.wait_until_processed(asset_path.clone())
73
.await;
74
match process_result {
75
ProcessStatus::Processed => {}
76
ProcessStatus::Failed | ProcessStatus::NonExistent => {
77
return Err(AssetReaderError::NotFound(path.to_owned()));
78
}
79
}
80
trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
81
let lock = self
82
.processing_state
83
.get_transaction_lock(&asset_path)
84
.await?;
85
let meta_reader = self.reader.read_meta(path).await?;
86
let reader = TransactionLockedReader::new(meta_reader, lock);
87
Ok(reader)
88
}
89
90
async fn read_directory<'a>(
91
&'a self,
92
path: &'a Path,
93
) -> Result<Box<PathStream>, AssetReaderError> {
94
trace!(
95
"Waiting for processing to finish before reading directory {:?}",
96
path
97
);
98
self.processing_state.wait_until_finished().await;
99
trace!("Processing finished, reading directory {:?}", path);
100
let result = self.reader.read_directory(path).await?;
101
Ok(result)
102
}
103
104
async fn is_directory<'a>(&'a self, path: &'a Path) -> Result<bool, AssetReaderError> {
105
trace!(
106
"Waiting for processing to finish before reading directory {:?}",
107
path
108
);
109
self.processing_state.wait_until_finished().await;
110
trace!("Processing finished, getting directory status {:?}", path);
111
let result = self.reader.is_directory(path).await?;
112
Ok(result)
113
}
114
}
115
116
/// An [`AsyncRead`] impl that will hold its asset's transaction lock until [`TransactionLockedReader`] is dropped.
117
pub struct TransactionLockedReader<'a> {
118
reader: Box<dyn Reader + 'a>,
119
_file_transaction_lock: RwLockReadGuardArc<()>,
120
}
121
122
impl<'a> TransactionLockedReader<'a> {
123
fn new(reader: Box<dyn Reader + 'a>, file_transaction_lock: RwLockReadGuardArc<()>) -> Self {
124
Self {
125
reader,
126
_file_transaction_lock: file_transaction_lock,
127
}
128
}
129
}
130
131
impl AsyncRead for TransactionLockedReader<'_> {
132
fn poll_read(
133
mut self: Pin<&mut Self>,
134
cx: &mut core::task::Context<'_>,
135
buf: &mut [u8],
136
) -> Poll<futures_io::Result<usize>> {
137
Pin::new(&mut self.reader).poll_read(cx, buf)
138
}
139
}
140
141
impl Reader for TransactionLockedReader<'_> {
142
fn read_to_end<'a>(
143
&'a mut self,
144
buf: &'a mut Vec<u8>,
145
) -> stackfuture::StackFuture<'a, std::io::Result<usize>, { super::STACK_FUTURE_SIZE }> {
146
self.reader.read_to_end(buf)
147
}
148
149
fn seekable(&mut self) -> Result<&mut dyn SeekableReader, ReaderNotSeekableError> {
150
self.reader.seekable()
151
}
152
}
153
154