Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_asset/src/io/processor_gated.rs
6600 views
1
use crate::{
2
io::{AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader},
3
processor::{AssetProcessorData, ProcessStatus},
4
AssetPath,
5
};
6
use alloc::{borrow::ToOwned, boxed::Box, sync::Arc, vec::Vec};
7
use async_lock::RwLockReadGuardArc;
8
use core::{pin::Pin, task::Poll};
9
use futures_io::AsyncRead;
10
use std::path::Path;
11
use tracing::trace;
12
13
use super::{AsyncSeekForward, ErasedAssetReader};
14
15
/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a
16
/// given path until that path has been processed by [`AssetProcessor`].
17
///
18
/// [`AssetProcessor`]: crate::processor::AssetProcessor
19
pub struct ProcessorGatedReader {
20
reader: Box<dyn ErasedAssetReader>,
21
source: AssetSourceId<'static>,
22
processor_data: Arc<AssetProcessorData>,
23
}
24
25
impl ProcessorGatedReader {
26
/// Creates a new [`ProcessorGatedReader`].
27
pub fn new(
28
source: AssetSourceId<'static>,
29
reader: Box<dyn ErasedAssetReader>,
30
processor_data: Arc<AssetProcessorData>,
31
) -> Self {
32
Self {
33
source,
34
processor_data,
35
reader,
36
}
37
}
38
39
/// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
40
/// while it is held.
41
async fn get_transaction_lock(
42
&self,
43
path: &AssetPath<'static>,
44
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
45
let infos = self.processor_data.asset_infos.read().await;
46
let info = infos
47
.get(path)
48
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
49
Ok(info.file_transaction_lock.read_arc().await)
50
}
51
}
52
53
impl AssetReader for ProcessorGatedReader {
54
async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
55
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
56
trace!("Waiting for processing to finish before reading {asset_path}");
57
let process_result = self
58
.processor_data
59
.wait_until_processed(asset_path.clone())
60
.await;
61
match process_result {
62
ProcessStatus::Processed => {}
63
ProcessStatus::Failed | ProcessStatus::NonExistent => {
64
return Err(AssetReaderError::NotFound(path.to_owned()));
65
}
66
}
67
trace!("Processing finished with {asset_path}, reading {process_result:?}",);
68
let lock = self.get_transaction_lock(&asset_path).await?;
69
let asset_reader = self.reader.read(path).await?;
70
let reader = TransactionLockedReader::new(asset_reader, lock);
71
Ok(reader)
72
}
73
74
async fn read_meta<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
75
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
76
trace!("Waiting for processing to finish before reading meta for {asset_path}",);
77
let process_result = self
78
.processor_data
79
.wait_until_processed(asset_path.clone())
80
.await;
81
match process_result {
82
ProcessStatus::Processed => {}
83
ProcessStatus::Failed | ProcessStatus::NonExistent => {
84
return Err(AssetReaderError::NotFound(path.to_owned()));
85
}
86
}
87
trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
88
let lock = self.get_transaction_lock(&asset_path).await?;
89
let meta_reader = self.reader.read_meta(path).await?;
90
let reader = TransactionLockedReader::new(meta_reader, lock);
91
Ok(reader)
92
}
93
94
async fn read_directory<'a>(
95
&'a self,
96
path: &'a Path,
97
) -> Result<Box<PathStream>, AssetReaderError> {
98
trace!(
99
"Waiting for processing to finish before reading directory {:?}",
100
path
101
);
102
self.processor_data.wait_until_finished().await;
103
trace!("Processing finished, reading directory {:?}", path);
104
let result = self.reader.read_directory(path).await?;
105
Ok(result)
106
}
107
108
async fn is_directory<'a>(&'a self, path: &'a Path) -> Result<bool, AssetReaderError> {
109
trace!(
110
"Waiting for processing to finish before reading directory {:?}",
111
path
112
);
113
self.processor_data.wait_until_finished().await;
114
trace!("Processing finished, getting directory status {:?}", path);
115
let result = self.reader.is_directory(path).await?;
116
Ok(result)
117
}
118
}
119
120
/// An [`AsyncRead`] impl that will hold its asset's transaction lock until [`TransactionLockedReader`] is dropped.
121
pub struct TransactionLockedReader<'a> {
122
reader: Box<dyn Reader + 'a>,
123
_file_transaction_lock: RwLockReadGuardArc<()>,
124
}
125
126
impl<'a> TransactionLockedReader<'a> {
127
fn new(reader: Box<dyn Reader + 'a>, file_transaction_lock: RwLockReadGuardArc<()>) -> Self {
128
Self {
129
reader,
130
_file_transaction_lock: file_transaction_lock,
131
}
132
}
133
}
134
135
impl AsyncRead for TransactionLockedReader<'_> {
136
fn poll_read(
137
mut self: Pin<&mut Self>,
138
cx: &mut core::task::Context<'_>,
139
buf: &mut [u8],
140
) -> Poll<futures_io::Result<usize>> {
141
Pin::new(&mut self.reader).poll_read(cx, buf)
142
}
143
}
144
145
impl AsyncSeekForward for TransactionLockedReader<'_> {
146
fn poll_seek_forward(
147
mut self: Pin<&mut Self>,
148
cx: &mut core::task::Context<'_>,
149
offset: u64,
150
) -> Poll<std::io::Result<u64>> {
151
Pin::new(&mut self.reader).poll_seek_forward(cx, offset)
152
}
153
}
154
155
impl Reader for TransactionLockedReader<'_> {
156
fn read_to_end<'a>(
157
&'a mut self,
158
buf: &'a mut Vec<u8>,
159
) -> stackfuture::StackFuture<'a, std::io::Result<usize>, { super::STACK_FUTURE_SIZE }> {
160
self.reader.read_to_end(buf)
161
}
162
}
163
164