Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_asset/src/processor/mod.rs
6604 views
1
//! Asset processing in Bevy is a framework for automatically transforming artist-authored assets into the format that best suits the needs of your particular game.
2
//!
3
//! You can think of the asset processing system as a "build system" for assets.
4
//! When an artist adds a new asset to the project or an asset is changed (assuming asset hot reloading is enabled), the asset processing system will automatically perform the specified processing steps on the asset.
5
//! This can include things like creating lightmaps for baked lighting, compressing a `.wav` file to an `.ogg`, or generating mipmaps for a texture.
6
//!
7
//! Its core values are:
8
//!
9
//! 1. Automatic: new and changed assets should be ready to use in-game without requiring any manual conversion or cleanup steps.
10
//! 2. Configurable: every game has its own needs, and a high level of transparency and control is required.
11
//! 3. Lossless: the original asset should always be preserved, ensuring artists can make changes later.
12
//! 4. Deterministic: performing the same processing steps on the same asset should (generally) produce the exact same result. In cases where this doesn't make sense (steps that involve a degree of randomness or uncertainty), the results across runs should be "acceptably similar", as they will be generated once for a given set of inputs and cached.
13
//!
14
//! Taken together, this means that the original asset plus the processing steps should be enough to regenerate the final asset.
15
//! While it may be possible to manually edit the final asset, this should be discouraged.
16
//! Final post-processed assets should generally not be version-controlled, except to save developer time when recomputing heavy asset processing steps.
17
//!
18
//! # Usage
19
//!
20
//! Asset processing can be enabled or disabled in [`AssetPlugin`](crate::AssetPlugin) by setting the [`AssetMode`](crate::AssetMode).\
21
//! Enable Bevy's `file_watcher` feature to automatically watch for changes to assets and reprocess them.
22
//!
23
//! To register a new asset processor, use [`AssetProcessor::register_processor`].
24
//! To set the default asset processor for a given extension, use [`AssetProcessor::set_default_processor`].
25
//! In most cases, these methods will be called directly on [`App`](bevy_app::App) using the [`AssetApp`](crate::AssetApp) extension trait.
26
//!
27
//! If a default asset processor is set, assets with a matching extension will be processed using that processor before loading.
28
//!
29
//! For an end-to-end example, check out the examples in the [`examples/asset/processing`](https://github.com/bevyengine/bevy/tree/latest/examples/asset/processing) directory of the Bevy repository.
30
//!
31
//! # Defining asset processors
32
//!
33
//! Bevy provides two different ways to define new asset processors:
34
//!
35
//! - [`LoadTransformAndSave`] + [`AssetTransformer`](crate::transformer::AssetTransformer): a high-level API for loading, transforming, and saving assets.
36
//! - [`Process`]: a flexible low-level API for processing assets in arbitrary ways.
37
//!
38
//! In most cases, [`LoadTransformAndSave`] should be sufficient.
39
40
mod log;
41
mod process;
42
43
pub use log::*;
44
pub use process::*;
45
46
use crate::{
47
io::{
48
AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
49
AssetSources, AssetWriterError, ErasedAssetReader, ErasedAssetWriter,
50
MissingAssetSourceError,
51
},
52
meta::{
53
get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54
AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55
},
56
AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57
MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
58
};
59
use alloc::{borrow::ToOwned, boxed::Box, collections::VecDeque, sync::Arc, vec, vec::Vec};
60
use bevy_ecs::prelude::*;
61
use bevy_platform::collections::{HashMap, HashSet};
62
use bevy_tasks::IoTaskPool;
63
use futures_io::ErrorKind;
64
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
65
use parking_lot::RwLock;
66
use std::path::{Path, PathBuf};
67
use thiserror::Error;
68
use tracing::{debug, error, trace, warn};
69
70
#[cfg(feature = "trace")]
71
use {
72
alloc::string::ToString,
73
bevy_tasks::ConditionalSendFuture,
74
tracing::{info_span, instrument::Instrument},
75
};
76
77
/// A "background" asset processor that reads asset values from a source [`AssetSource`] (which corresponds to an [`AssetReader`](crate::io::AssetReader) / [`AssetWriter`](crate::io::AssetWriter) pair),
78
/// processes them in some way, and writes them to a destination [`AssetSource`].
79
///
80
/// This will create .meta files (a human-editable serialized form of [`AssetMeta`]) in the source [`AssetSource`] for assets that
81
/// that can be loaded and/or processed. This enables developers to configure how each asset should be loaded and/or processed.
82
///
83
/// [`AssetProcessor`] can be run in the background while a Bevy App is running. Changes to assets will be automatically detected and hot-reloaded.
84
///
85
/// Assets will only be re-processed if they have been changed. A hash of each asset source is stored in the metadata of the processed version of the
86
/// asset, which is used to determine if the asset source has actually changed.
87
///
88
/// A [`ProcessorTransactionLog`] is produced, which uses "write-ahead logging" to make the [`AssetProcessor`] crash and failure resistant. If a failed/unfinished
89
/// transaction from a previous run is detected, the affected asset(s) will be re-processed.
90
///
91
/// [`AssetProcessor`] can be cloned. It is backed by an [`Arc`] so clones will share state. Clones can be freely used in parallel.
92
#[derive(Resource, Clone)]
93
pub struct AssetProcessor {
94
server: AssetServer,
95
pub(crate) data: Arc<AssetProcessorData>,
96
}
97
98
/// Internal data stored inside an [`AssetProcessor`].
99
pub struct AssetProcessorData {
100
pub(crate) asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
101
log: async_lock::RwLock<Option<ProcessorTransactionLog>>,
102
processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
103
/// Default processors for file extensions
104
default_processors: RwLock<HashMap<Box<str>, &'static str>>,
105
state: async_lock::RwLock<ProcessorState>,
106
sources: AssetSources,
107
initialized_sender: async_broadcast::Sender<()>,
108
initialized_receiver: async_broadcast::Receiver<()>,
109
finished_sender: async_broadcast::Sender<()>,
110
finished_receiver: async_broadcast::Receiver<()>,
111
}
112
113
impl AssetProcessor {
114
/// Creates a new [`AssetProcessor`] instance.
115
pub fn new(source: &mut AssetSourceBuilders) -> Self {
116
let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false)));
117
// The asset processor uses its own asset server with its own id space
118
let mut sources = source.build_sources(false, false);
119
sources.gate_on_processor(data.clone());
120
let server = AssetServer::new_with_meta_check(
121
sources,
122
AssetServerMode::Processed,
123
AssetMetaCheck::Always,
124
false,
125
UnapprovedPathMode::default(),
126
);
127
Self { server, data }
128
}
129
130
/// Gets a reference to the [`Arc`] containing the [`AssetProcessorData`].
131
pub fn data(&self) -> &Arc<AssetProcessorData> {
132
&self.data
133
}
134
135
/// The "internal" [`AssetServer`] used by the [`AssetProcessor`]. This is _separate_ from the asset processor used by
136
/// the main App. It has different processor-specific configuration and a different ID space.
137
pub fn server(&self) -> &AssetServer {
138
&self.server
139
}
140
141
async fn set_state(&self, state: ProcessorState) {
142
let mut state_guard = self.data.state.write().await;
143
let last_state = *state_guard;
144
*state_guard = state;
145
if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
146
self.data.finished_sender.broadcast(()).await.unwrap();
147
} else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
148
self.data.initialized_sender.broadcast(()).await.unwrap();
149
}
150
}
151
152
/// Retrieves the current [`ProcessorState`]
153
pub async fn get_state(&self) -> ProcessorState {
154
*self.data.state.read().await
155
}
156
157
/// Retrieves the [`AssetSource`] for this processor
158
#[inline]
159
pub fn get_source<'a>(
160
&self,
161
id: impl Into<AssetSourceId<'a>>,
162
) -> Result<&AssetSource, MissingAssetSourceError> {
163
self.data.sources.get(id.into())
164
}
165
166
#[inline]
167
pub fn sources(&self) -> &AssetSources {
168
&self.data.sources
169
}
170
171
/// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
172
/// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
173
async fn log_unrecoverable(&self) {
174
let mut log = self.data.log.write().await;
175
let log = log.as_mut().unwrap();
176
log.unrecoverable().await.unwrap();
177
}
178
179
/// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`AssetProcessor::log_end_processing`],
180
/// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
181
async fn log_begin_processing(&self, path: &AssetPath<'_>) {
182
let mut log = self.data.log.write().await;
183
let log = log.as_mut().unwrap();
184
log.begin_processing(path).await.unwrap();
185
}
186
187
/// Logs the end of an asset being successfully processed. See [`AssetProcessor::log_begin_processing`].
188
async fn log_end_processing(&self, path: &AssetPath<'_>) {
189
let mut log = self.data.log.write().await;
190
let log = log.as_mut().unwrap();
191
log.end_processing(path).await.unwrap();
192
}
193
194
/// Starts the processor in a background thread.
195
pub fn start(_processor: Res<Self>) {
196
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
197
error!("Cannot run AssetProcessor in single threaded mode (or Wasm) yet.");
198
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
199
{
200
let processor = _processor.clone();
201
std::thread::spawn(move || {
202
processor.process_assets();
203
bevy_tasks::block_on(processor.listen_for_source_change_events());
204
});
205
}
206
}
207
208
/// Processes all assets. This will:
209
/// * For each "processed [`AssetSource`]:
210
/// * Scan the [`ProcessorTransactionLog`] and recover from any failures detected
211
/// * Scan the processed [`AssetReader`](crate::io::AssetReader) to build the current view of
212
/// already processed assets.
213
/// * Scan the unprocessed [`AssetReader`](crate::io::AssetReader) and remove any final
214
/// processed assets that are invalid or no longer exist.
215
/// * For each asset in the unprocessed [`AssetReader`](crate::io::AssetReader), kick off a new
216
/// "process job", which will process the asset
217
/// (if the latest version of the asset has not been processed).
218
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
219
pub fn process_assets(&self) {
220
let start_time = std::time::Instant::now();
221
debug!("Processing Assets");
222
IoTaskPool::get().scope(|scope| {
223
scope.spawn(async move {
224
self.initialize().await.unwrap();
225
for source in self.sources().iter_processed() {
226
self.process_assets_internal(scope, source, PathBuf::from(""))
227
.await
228
.unwrap();
229
}
230
});
231
});
232
// This must happen _after_ the scope resolves or it will happen "too early"
233
// Don't move this into the async scope above! process_assets is a blocking/sync function this is fine
234
bevy_tasks::block_on(self.finish_processing_assets());
235
let end_time = std::time::Instant::now();
236
debug!("Processing finished in {:?}", end_time - start_time);
237
}
238
239
/// Listens for changes to assets in the source [`AssetSource`] and update state accordingly.
240
// PERF: parallelize change event processing
241
pub async fn listen_for_source_change_events(&self) {
242
debug!("Listening for changes to source assets");
243
loop {
244
let mut started_processing = false;
245
246
for source in self.data.sources.iter_processed() {
247
if let Some(receiver) = source.event_receiver() {
248
for event in receiver.try_iter() {
249
if !started_processing {
250
self.set_state(ProcessorState::Processing).await;
251
started_processing = true;
252
}
253
254
self.handle_asset_source_event(source, event).await;
255
}
256
}
257
}
258
259
if started_processing {
260
self.finish_processing_assets().await;
261
}
262
}
263
}
264
265
/// Writes the default meta file for the provided `path`.
266
///
267
/// This function generates the appropriate meta file to process `path` with the default
268
/// processor. If there is no default processor, it falls back to the default loader.
269
///
270
/// Note if there is already a meta file for `path`, this function returns
271
/// `Err(WriteDefaultMetaError::MetaAlreadyExists)`.
272
pub async fn write_default_meta_file_for_path(
273
&self,
274
path: impl Into<AssetPath<'_>>,
275
) -> Result<(), WriteDefaultMetaError> {
276
let path = path.into();
277
let Some(processor) = path
278
.get_full_extension()
279
.and_then(|extension| self.get_default_processor(&extension))
280
else {
281
return self
282
.server
283
.write_default_loader_meta_file_for_path(path)
284
.await;
285
};
286
287
let meta = processor.default_meta();
288
let serialized_meta = meta.serialize();
289
290
let source = self.get_source(path.source())?;
291
292
// Note: we get the reader rather than the processed reader, since we want to write the meta
293
// file for the unprocessed version of that asset (so it will be processed by the default
294
// processor).
295
let reader = source.reader();
296
match reader.read_meta_bytes(path.path()).await {
297
Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
298
Err(AssetReaderError::NotFound(_)) => {
299
// The meta file couldn't be found so just fall through.
300
}
301
Err(AssetReaderError::Io(err)) => {
302
return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
303
}
304
Err(AssetReaderError::HttpError(err)) => {
305
return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
306
}
307
}
308
309
let writer = source.writer()?;
310
writer
311
.write_meta_bytes(path.path(), &serialized_meta)
312
.await?;
313
314
Ok(())
315
}
316
317
async fn handle_asset_source_event(&self, source: &AssetSource, event: AssetSourceEvent) {
318
trace!("{event:?}");
319
match event {
320
AssetSourceEvent::AddedAsset(path)
321
| AssetSourceEvent::AddedMeta(path)
322
| AssetSourceEvent::ModifiedAsset(path)
323
| AssetSourceEvent::ModifiedMeta(path) => {
324
self.process_asset(source, path).await;
325
}
326
AssetSourceEvent::RemovedAsset(path) => {
327
self.handle_removed_asset(source, path).await;
328
}
329
AssetSourceEvent::RemovedMeta(path) => {
330
self.handle_removed_meta(source, path).await;
331
}
332
AssetSourceEvent::AddedFolder(path) => {
333
self.handle_added_folder(source, path).await;
334
}
335
// NOTE: As a heads up for future devs: this event shouldn't be run in parallel with other events that might
336
// touch this folder (ex: the folder might be re-created with new assets). Clean up the old state first.
337
// Currently this event handler is not parallel, but it could be (and likely should be) in the future.
338
AssetSourceEvent::RemovedFolder(path) => {
339
self.handle_removed_folder(source, &path).await;
340
}
341
AssetSourceEvent::RenamedAsset { old, new } => {
342
// If there was a rename event, but the path hasn't changed, this asset might need reprocessing.
343
// Sometimes this event is returned when an asset is moved "back" into the asset folder
344
if old == new {
345
self.process_asset(source, new).await;
346
} else {
347
self.handle_renamed_asset(source, old, new).await;
348
}
349
}
350
AssetSourceEvent::RenamedMeta { old, new } => {
351
// If there was a rename event, but the path hasn't changed, this asset meta might need reprocessing.
352
// Sometimes this event is returned when an asset meta is moved "back" into the asset folder
353
if old == new {
354
self.process_asset(source, new).await;
355
} else {
356
debug!("Meta renamed from {old:?} to {new:?}");
357
let mut infos = self.data.asset_infos.write().await;
358
// Renaming meta should not assume that an asset has also been renamed. Check both old and new assets to see
359
// if they should be re-imported (and/or have new meta generated)
360
let new_asset_path = AssetPath::from(new).with_source(source.id());
361
let old_asset_path = AssetPath::from(old).with_source(source.id());
362
infos.check_reprocess_queue.push_back(old_asset_path);
363
infos.check_reprocess_queue.push_back(new_asset_path);
364
}
365
}
366
AssetSourceEvent::RenamedFolder { old, new } => {
367
// If there was a rename event, but the path hasn't changed, this asset folder might need reprocessing.
368
// Sometimes this event is returned when an asset meta is moved "back" into the asset folder
369
if old == new {
370
self.handle_added_folder(source, new).await;
371
} else {
372
// PERF: this reprocesses everything in the moved folder. this is not necessary in most cases, but
373
// requires some nuance when it comes to path handling.
374
self.handle_removed_folder(source, &old).await;
375
self.handle_added_folder(source, new).await;
376
}
377
}
378
AssetSourceEvent::RemovedUnknown { path, is_meta } => {
379
let processed_reader = source.processed_reader().unwrap();
380
match processed_reader.is_directory(&path).await {
381
Ok(is_directory) => {
382
if is_directory {
383
self.handle_removed_folder(source, &path).await;
384
} else if is_meta {
385
self.handle_removed_meta(source, path).await;
386
} else {
387
self.handle_removed_asset(source, path).await;
388
}
389
}
390
Err(err) => {
391
match err {
392
AssetReaderError::NotFound(_) => {
393
// if the path is not found, a processed version does not exist
394
}
395
AssetReaderError::Io(err) => {
396
error!(
397
"Path '{}' was removed, but the destination reader could not determine if it \
398
was a folder or a file due to the following error: {err}",
399
AssetPath::from_path(&path).with_source(source.id())
400
);
401
}
402
AssetReaderError::HttpError(status) => {
403
error!(
404
"Path '{}' was removed, but the destination reader could not determine if it \
405
was a folder or a file due to receiving an unexpected HTTP Status {status}",
406
AssetPath::from_path(&path).with_source(source.id())
407
);
408
}
409
}
410
}
411
}
412
}
413
}
414
}
415
416
async fn handle_added_folder(&self, source: &AssetSource, path: PathBuf) {
417
debug!(
418
"Folder {} was added. Attempting to re-process",
419
AssetPath::from_path(&path).with_source(source.id())
420
);
421
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
422
error!("AddFolder event cannot be handled in single threaded mode (or Wasm) yet.");
423
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
424
IoTaskPool::get().scope(|scope| {
425
scope.spawn(async move {
426
self.process_assets_internal(scope, source, path)
427
.await
428
.unwrap();
429
});
430
});
431
}
432
433
/// Responds to a removed meta event by reprocessing the asset at the given path.
434
async fn handle_removed_meta(&self, source: &AssetSource, path: PathBuf) {
435
// If meta was removed, we might need to regenerate it.
436
// Likewise, the user might be manually re-adding the asset.
437
// Therefore, we shouldn't automatically delete the asset ... that is a
438
// user-initiated action.
439
debug!(
440
"Meta for asset {} was removed. Attempting to re-process",
441
AssetPath::from_path(&path).with_source(source.id())
442
);
443
self.process_asset(source, path).await;
444
}
445
446
/// Removes all processed assets stored at the given path (respecting transactionality), then removes the folder itself.
447
async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
448
debug!(
449
"Removing folder {} because source was removed",
450
path.display()
451
);
452
let processed_reader = source.processed_reader().unwrap();
453
match processed_reader.read_directory(path).await {
454
Ok(mut path_stream) => {
455
while let Some(child_path) = path_stream.next().await {
456
self.handle_removed_asset(source, child_path).await;
457
}
458
}
459
Err(err) => match err {
460
AssetReaderError::NotFound(_err) => {
461
// The processed folder does not exist. No need to update anything
462
}
463
AssetReaderError::HttpError(status) => {
464
self.log_unrecoverable().await;
465
error!(
466
"Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
467
in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
468
);
469
}
470
AssetReaderError::Io(err) => {
471
self.log_unrecoverable().await;
472
error!(
473
"Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
474
in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
475
);
476
}
477
},
478
}
479
let processed_writer = source.processed_writer().unwrap();
480
if let Err(err) = processed_writer.remove_directory(path).await {
481
match err {
482
AssetWriterError::Io(err) => {
483
// we can ignore NotFound because if the "final" file in a folder was removed
484
// then we automatically clean up this folder
485
if err.kind() != ErrorKind::NotFound {
486
let asset_path = AssetPath::from_path(path).with_source(source.id());
487
error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
488
}
489
}
490
}
491
}
492
}
493
494
/// Removes the processed version of an asset and associated in-memory metadata. This will block until all existing reads/writes to the
495
/// asset have finished, thanks to the `file_transaction_lock`.
496
async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
497
let asset_path = AssetPath::from(path).with_source(source.id());
498
debug!("Removing processed {asset_path} because source was removed");
499
let mut infos = self.data.asset_infos.write().await;
500
if let Some(info) = infos.get(&asset_path) {
501
// we must wait for uncontested write access to the asset source to ensure existing readers / writers
502
// can finish their operations
503
let _write_lock = info.file_transaction_lock.write();
504
self.remove_processed_asset_and_meta(source, asset_path.path())
505
.await;
506
}
507
infos.remove(&asset_path).await;
508
}
509
510
/// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata.
511
/// This will cause direct path dependencies to break.
512
async fn handle_renamed_asset(&self, source: &AssetSource, old: PathBuf, new: PathBuf) {
513
let mut infos = self.data.asset_infos.write().await;
514
let old = AssetPath::from(old).with_source(source.id());
515
let new = AssetPath::from(new).with_source(source.id());
516
let processed_writer = source.processed_writer().unwrap();
517
if let Some(info) = infos.get(&old) {
518
// we must wait for uncontested write access to the asset source to ensure existing readers / writers
519
// can finish their operations
520
let _write_lock = info.file_transaction_lock.write();
521
processed_writer
522
.rename(old.path(), new.path())
523
.await
524
.unwrap();
525
processed_writer
526
.rename_meta(old.path(), new.path())
527
.await
528
.unwrap();
529
}
530
infos.rename(&old, &new).await;
531
}
532
533
async fn finish_processing_assets(&self) {
534
self.try_reprocessing_queued().await;
535
// clean up metadata in asset server
536
self.server.data.infos.write().consume_handle_drop_events();
537
self.set_state(ProcessorState::Finished).await;
538
}
539
540
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
541
async fn process_assets_internal<'scope>(
542
&'scope self,
543
scope: &'scope bevy_tasks::Scope<'scope, '_, ()>,
544
source: &'scope AssetSource,
545
path: PathBuf,
546
) -> Result<(), AssetReaderError> {
547
if source.reader().is_directory(&path).await? {
548
let mut path_stream = source.reader().read_directory(&path).await?;
549
while let Some(path) = path_stream.next().await {
550
Box::pin(self.process_assets_internal(scope, source, path)).await?;
551
}
552
} else {
553
// Files without extensions are skipped
554
let processor = self.clone();
555
scope.spawn(async move {
556
processor.process_asset(source, path).await;
557
});
558
}
559
Ok(())
560
}
561
562
async fn try_reprocessing_queued(&self) {
563
loop {
564
let mut check_reprocess_queue =
565
core::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue);
566
IoTaskPool::get().scope(|scope| {
567
for path in check_reprocess_queue.drain(..) {
568
let processor = self.clone();
569
let source = self.get_source(path.source()).unwrap();
570
scope.spawn(async move {
571
processor.process_asset(source, path.into()).await;
572
});
573
}
574
});
575
let infos = self.data.asset_infos.read().await;
576
if infos.check_reprocess_queue.is_empty() {
577
break;
578
}
579
}
580
}
581
582
/// Register a new asset processor.
583
pub fn register_processor<P: Process>(&self, processor: P) {
584
let mut process_plans = self.data.processors.write();
585
#[cfg(feature = "trace")]
586
let processor = InstrumentedAssetProcessor(processor);
587
process_plans.insert(core::any::type_name::<P>(), Arc::new(processor));
588
}
589
590
/// Set the default processor for the given `extension`. Make sure `P` is registered with [`AssetProcessor::register_processor`].
591
pub fn set_default_processor<P: Process>(&self, extension: &str) {
592
let mut default_processors = self.data.default_processors.write();
593
default_processors.insert(extension.into(), core::any::type_name::<P>());
594
}
595
596
/// Returns the default processor for the given `extension`, if it exists.
597
pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
598
let default_processors = self.data.default_processors.read();
599
let key = default_processors.get(extension)?;
600
self.data.processors.read().get(key).cloned()
601
}
602
603
/// Returns the processor with the given `processor_type_name`, if it exists.
604
pub fn get_processor(&self, processor_type_name: &str) -> Option<Arc<dyn ErasedProcessor>> {
605
let processors = self.data.processors.read();
606
processors.get(processor_type_name).cloned()
607
}
608
609
/// Populates the initial view of each asset by scanning the unprocessed and processed asset folders.
610
/// This info will later be used to determine whether or not to re-process an asset
611
///
612
/// This will validate transactions and recover failed transactions when necessary.
613
#[cfg_attr(
614
any(target_arch = "wasm32", not(feature = "multi_threaded")),
615
expect(
616
dead_code,
617
reason = "This function is only used when the `multi_threaded` feature is enabled, and when not on WASM."
618
)
619
)]
620
async fn initialize(&self) -> Result<(), InitializeError> {
621
self.validate_transaction_log_and_recover().await;
622
let mut asset_infos = self.data.asset_infos.write().await;
623
624
/// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty
625
/// folders when they are discovered.
626
async fn get_asset_paths(
627
reader: &dyn ErasedAssetReader,
628
clean_empty_folders_writer: Option<&dyn ErasedAssetWriter>,
629
path: PathBuf,
630
paths: &mut Vec<PathBuf>,
631
) -> Result<bool, AssetReaderError> {
632
if reader.is_directory(&path).await? {
633
let mut path_stream = reader.read_directory(&path).await?;
634
let mut contains_files = false;
635
636
while let Some(child_path) = path_stream.next().await {
637
contains_files |= Box::pin(get_asset_paths(
638
reader,
639
clean_empty_folders_writer,
640
child_path,
641
paths,
642
))
643
.await?;
644
}
645
if !contains_files
646
&& path.parent().is_some()
647
&& let Some(writer) = clean_empty_folders_writer
648
{
649
// it is ok for this to fail as it is just a cleanup job.
650
let _ = writer.remove_empty_directory(&path).await;
651
}
652
Ok(contains_files)
653
} else {
654
paths.push(path);
655
Ok(true)
656
}
657
}
658
659
for source in self.sources().iter_processed() {
660
let Ok(processed_reader) = source.processed_reader() else {
661
continue;
662
};
663
let Ok(processed_writer) = source.processed_writer() else {
664
continue;
665
};
666
let mut unprocessed_paths = Vec::new();
667
get_asset_paths(
668
source.reader(),
669
None,
670
PathBuf::from(""),
671
&mut unprocessed_paths,
672
)
673
.await
674
.map_err(InitializeError::FailedToReadSourcePaths)?;
675
676
let mut processed_paths = Vec::new();
677
get_asset_paths(
678
processed_reader,
679
Some(processed_writer),
680
PathBuf::from(""),
681
&mut processed_paths,
682
)
683
.await
684
.map_err(InitializeError::FailedToReadDestinationPaths)?;
685
686
for path in unprocessed_paths {
687
asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
688
}
689
690
for path in processed_paths {
691
let mut dependencies = Vec::new();
692
let asset_path = AssetPath::from(path).with_source(source.id());
693
if let Some(info) = asset_infos.get_mut(&asset_path) {
694
match processed_reader.read_meta_bytes(asset_path.path()).await {
695
Ok(meta_bytes) => {
696
match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
697
Ok(minimal) => {
698
trace!(
699
"Populated processed info for asset {asset_path} {:?}",
700
minimal.processed_info
701
);
702
703
if let Some(processed_info) = &minimal.processed_info {
704
for process_dependency_info in
705
&processed_info.process_dependencies
706
{
707
dependencies.push(process_dependency_info.path.clone());
708
}
709
}
710
info.processed_info = minimal.processed_info;
711
}
712
Err(err) => {
713
trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
714
self.remove_processed_asset_and_meta(source, asset_path.path())
715
.await;
716
}
717
}
718
}
719
Err(err) => {
720
trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
721
self.remove_processed_asset_and_meta(source, asset_path.path())
722
.await;
723
}
724
}
725
} else {
726
trace!("Removing processed data for non-existent asset {asset_path}");
727
self.remove_processed_asset_and_meta(source, asset_path.path())
728
.await;
729
}
730
731
for dependency in dependencies {
732
asset_infos.add_dependent(&dependency, asset_path.clone());
733
}
734
}
735
}
736
737
self.set_state(ProcessorState::Processing).await;
738
739
Ok(())
740
}
741
742
/// Removes the processed version of an asset and its metadata, if it exists. This _is not_ transactional like `remove_processed_asset_transactional`, nor
743
/// does it remove existing in-memory metadata.
744
async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
745
if let Err(err) = source.processed_writer().unwrap().remove(path).await {
746
warn!("Failed to remove non-existent asset {path:?}: {err}");
747
}
748
749
if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
750
warn!("Failed to remove non-existent meta {path:?}: {err}");
751
}
752
753
self.clean_empty_processed_ancestor_folders(source, path)
754
.await;
755
}
756
757
async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
758
// As a safety precaution don't delete absolute paths to avoid deleting folders outside of the destination folder
759
if path.is_absolute() {
760
error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
761
return;
762
}
763
while let Some(parent) = path.parent() {
764
if parent == Path::new("") {
765
break;
766
}
767
if source
768
.processed_writer()
769
.unwrap()
770
.remove_empty_directory(parent)
771
.await
772
.is_err()
773
{
774
// if we fail to delete a folder, stop walking up the tree
775
break;
776
}
777
}
778
}
779
780
/// Processes the asset (if it has not already been processed or the asset source has changed).
781
/// If the asset has "process dependencies" (relies on the values of other assets), it will asynchronously await until
782
/// the dependencies have been processed (See [`ProcessorGatedReader`], which is used in the [`AssetProcessor`]'s [`AssetServer`]
783
/// to block reads until the asset is processed).
784
///
785
/// [`LoadContext`]: crate::loader::LoadContext
786
/// [`ProcessorGatedReader`]: crate::io::processor_gated::ProcessorGatedReader
787
async fn process_asset(&self, source: &AssetSource, path: PathBuf) {
788
let asset_path = AssetPath::from(path).with_source(source.id());
789
let result = self.process_asset_internal(source, &asset_path).await;
790
let mut infos = self.data.asset_infos.write().await;
791
infos.finish_processing(asset_path, result).await;
792
}
793
794
async fn process_asset_internal(
795
&self,
796
source: &AssetSource,
797
asset_path: &AssetPath<'static>,
798
) -> Result<ProcessResult, ProcessError> {
799
// TODO: The extension check was removed now that AssetPath is the input. is that ok?
800
// TODO: check if already processing to protect against duplicate hot-reload events
801
debug!("Processing {}", asset_path);
802
let server = &self.server;
803
let path = asset_path.path();
804
let reader = source.reader();
805
806
let reader_err = |err| ProcessError::AssetReaderError {
807
path: asset_path.clone(),
808
err,
809
};
810
let writer_err = |err| ProcessError::AssetWriterError {
811
path: asset_path.clone(),
812
err,
813
};
814
815
// Note: we get the asset source reader first because we don't want to create meta files for assets that don't have source files
816
let mut byte_reader = reader.read(path).await.map_err(reader_err)?;
817
818
let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
819
Ok(meta_bytes) => {
820
let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
821
ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
822
})?;
823
let (meta, processor) = match minimal.asset {
824
AssetActionMinimal::Load { loader } => {
825
let loader = server.get_asset_loader_with_type_name(&loader).await?;
826
let meta = loader.deserialize_meta(&meta_bytes)?;
827
(meta, None)
828
}
829
AssetActionMinimal::Process { processor } => {
830
let processor = self
831
.get_processor(&processor)
832
.ok_or_else(|| ProcessError::MissingProcessor(processor))?;
833
let meta = processor.deserialize_meta(&meta_bytes)?;
834
(meta, Some(processor))
835
}
836
AssetActionMinimal::Ignore => {
837
return Ok(ProcessResult::Ignored);
838
}
839
};
840
(meta, meta_bytes, processor)
841
}
842
Err(AssetReaderError::NotFound(_path)) => {
843
let (meta, processor) = if let Some(processor) = asset_path
844
.get_full_extension()
845
.and_then(|ext| self.get_default_processor(&ext))
846
{
847
let meta = processor.default_meta();
848
(meta, Some(processor))
849
} else {
850
match server.get_path_asset_loader(asset_path.clone()).await {
851
Ok(loader) => (loader.default_meta(), None),
852
Err(MissingAssetLoaderForExtensionError { .. }) => {
853
let meta: Box<dyn AssetMetaDyn> =
854
Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
855
(meta, None)
856
}
857
}
858
};
859
let meta_bytes = meta.serialize();
860
(meta, meta_bytes, processor)
861
}
862
Err(err) => {
863
return Err(ProcessError::ReadAssetMetaError {
864
path: asset_path.clone(),
865
err,
866
})
867
}
868
};
869
870
let processed_writer = source.processed_writer()?;
871
872
let mut asset_bytes = Vec::new();
873
byte_reader
874
.read_to_end(&mut asset_bytes)
875
.await
876
.map_err(|e| ProcessError::AssetReaderError {
877
path: asset_path.clone(),
878
err: AssetReaderError::Io(e.into()),
879
})?;
880
881
// PERF: in theory these hashes could be streamed if we want to avoid allocating the whole asset.
882
// The downside is that reading assets would need to happen twice (once for the hash and once for the asset loader)
883
// Hard to say which is worse
884
let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
885
let mut new_processed_info = ProcessedInfo {
886
hash: new_hash,
887
full_hash: new_hash,
888
process_dependencies: Vec::new(),
889
};
890
891
{
892
let infos = self.data.asset_infos.read().await;
893
if let Some(current_processed_info) = infos
894
.get(asset_path)
895
.and_then(|i| i.processed_info.as_ref())
896
&& current_processed_info.hash == new_hash
897
{
898
let mut dependency_changed = false;
899
for current_dep_info in &current_processed_info.process_dependencies {
900
let live_hash = infos
901
.get(&current_dep_info.path)
902
.and_then(|i| i.processed_info.as_ref())
903
.map(|i| i.full_hash);
904
if live_hash != Some(current_dep_info.full_hash) {
905
dependency_changed = true;
906
break;
907
}
908
}
909
if !dependency_changed {
910
return Ok(ProcessResult::SkippedNotChanged);
911
}
912
}
913
}
914
// Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
915
// See ProcessedAssetInfo::file_transaction_lock docs for more info
916
let _transaction_lock = {
917
let mut infos = self.data.asset_infos.write().await;
918
let info = infos.get_or_insert(asset_path.clone());
919
info.file_transaction_lock.write_arc().await
920
};
921
922
// NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run.
923
// Directly writing to the asset destination in the processor necessitates this behavior
924
// TODO: this class of failure can be recovered via re-processing + smarter log validation that allows for duplicate transactions in the event of failures
925
self.log_begin_processing(asset_path).await;
926
if let Some(processor) = processor {
927
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
928
let mut processed_meta = {
929
let mut context =
930
ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
931
processor
932
.process(&mut context, source_meta, &mut *writer)
933
.await?
934
};
935
936
writer
937
.flush()
938
.await
939
.map_err(|e| ProcessError::AssetWriterError {
940
path: asset_path.clone(),
941
err: AssetWriterError::Io(e),
942
})?;
943
944
let full_hash = get_full_asset_hash(
945
new_hash,
946
new_processed_info
947
.process_dependencies
948
.iter()
949
.map(|i| i.full_hash),
950
);
951
new_processed_info.full_hash = full_hash;
952
*processed_meta.processed_info_mut() = Some(new_processed_info.clone());
953
let meta_bytes = processed_meta.serialize();
954
processed_writer
955
.write_meta_bytes(path, &meta_bytes)
956
.await
957
.map_err(writer_err)?;
958
} else {
959
processed_writer
960
.write_bytes(path, &asset_bytes)
961
.await
962
.map_err(writer_err)?;
963
*source_meta.processed_info_mut() = Some(new_processed_info.clone());
964
let meta_bytes = source_meta.serialize();
965
processed_writer
966
.write_meta_bytes(path, &meta_bytes)
967
.await
968
.map_err(writer_err)?;
969
}
970
self.log_end_processing(asset_path).await;
971
972
Ok(ProcessResult::Processed(new_processed_info))
973
}
974
975
async fn validate_transaction_log_and_recover(&self) {
976
if let Err(err) = ProcessorTransactionLog::validate().await {
977
let state_is_valid = match err {
978
ValidateLogError::ReadLogError(err) => {
979
error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
980
false
981
}
982
ValidateLogError::UnrecoverableError => {
983
error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
984
false
985
}
986
ValidateLogError::EntryErrors(entry_errors) => {
987
let mut state_is_valid = true;
988
for entry_error in entry_errors {
989
match entry_error {
990
LogEntryError::DuplicateTransaction(_)
991
| LogEntryError::EndedMissingTransaction(_) => {
992
error!("{}", entry_error);
993
state_is_valid = false;
994
break;
995
}
996
LogEntryError::UnfinishedTransaction(path) => {
997
debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
998
let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
999
error!("Failed to remove asset {path:?}: {message}");
1000
state_is_valid = false;
1001
};
1002
let Ok(source) = self.get_source(path.source()) else {
1003
unrecoverable_err(&"AssetSource does not exist");
1004
continue;
1005
};
1006
let Ok(processed_writer) = source.processed_writer() else {
1007
unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
1008
continue;
1009
};
1010
1011
if let Err(err) = processed_writer.remove(path.path()).await {
1012
match err {
1013
AssetWriterError::Io(err) => {
1014
// any error but NotFound means we could be in a bad state
1015
if err.kind() != ErrorKind::NotFound {
1016
unrecoverable_err(&err);
1017
}
1018
}
1019
}
1020
}
1021
if let Err(err) = processed_writer.remove_meta(path.path()).await {
1022
match err {
1023
AssetWriterError::Io(err) => {
1024
// any error but NotFound means we could be in a bad state
1025
if err.kind() != ErrorKind::NotFound {
1026
unrecoverable_err(&err);
1027
}
1028
}
1029
}
1030
}
1031
}
1032
}
1033
}
1034
state_is_valid
1035
}
1036
};
1037
1038
if !state_is_valid {
1039
error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
1040
for source in self.sources().iter_processed() {
1041
let Ok(processed_writer) = source.processed_writer() else {
1042
continue;
1043
};
1044
if let Err(err) = processed_writer
1045
.remove_assets_in_directory(Path::new(""))
1046
.await
1047
{
1048
panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
1049
}
1050
}
1051
}
1052
}
1053
let mut log = self.data.log.write().await;
1054
*log = match ProcessorTransactionLog::new().await {
1055
Ok(log) => Some(log),
1056
Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1057
};
1058
}
1059
}
1060
1061
impl AssetProcessorData {
1062
/// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`].
1063
pub fn new(source: AssetSources) -> Self {
1064
let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1065
let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1066
// allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1067
// not block if there was older state present.
1068
finished_sender.set_overflow(true);
1069
initialized_sender.set_overflow(true);
1070
1071
AssetProcessorData {
1072
sources: source,
1073
finished_sender,
1074
finished_receiver,
1075
initialized_sender,
1076
initialized_receiver,
1077
state: async_lock::RwLock::new(ProcessorState::Initializing),
1078
log: Default::default(),
1079
processors: Default::default(),
1080
asset_infos: Default::default(),
1081
default_processors: Default::default(),
1082
}
1083
}
1084
1085
/// Returns a future that will not finish until the path has been processed.
1086
pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1087
self.wait_until_initialized().await;
1088
let mut receiver = {
1089
let infos = self.asset_infos.write().await;
1090
let info = infos.get(&path);
1091
match info {
1092
Some(info) => match info.status {
1093
Some(result) => return result,
1094
// This receiver must be created prior to losing the read lock to ensure this is transactional
1095
None => info.status_receiver.clone(),
1096
},
1097
None => return ProcessStatus::NonExistent,
1098
}
1099
};
1100
receiver.recv().await.unwrap()
1101
}
1102
1103
/// Returns a future that will not finish until the processor has been initialized.
1104
pub async fn wait_until_initialized(&self) {
1105
let receiver = {
1106
let state = self.state.read().await;
1107
match *state {
1108
ProcessorState::Initializing => {
1109
// This receiver must be created prior to losing the read lock to ensure this is transactional
1110
Some(self.initialized_receiver.clone())
1111
}
1112
_ => None,
1113
}
1114
};
1115
1116
if let Some(mut receiver) = receiver {
1117
receiver.recv().await.unwrap();
1118
}
1119
}
1120
1121
/// Returns a future that will not finish until processing has finished.
1122
pub async fn wait_until_finished(&self) {
1123
let receiver = {
1124
let state = self.state.read().await;
1125
match *state {
1126
ProcessorState::Initializing | ProcessorState::Processing => {
1127
// This receiver must be created prior to losing the read lock to ensure this is transactional
1128
Some(self.finished_receiver.clone())
1129
}
1130
ProcessorState::Finished => None,
1131
}
1132
};
1133
1134
if let Some(mut receiver) = receiver {
1135
receiver.recv().await.unwrap();
1136
}
1137
}
1138
}
1139
1140
#[cfg(feature = "trace")]
1141
struct InstrumentedAssetProcessor<T>(T);
1142
1143
#[cfg(feature = "trace")]
1144
impl<T: Process> Process for InstrumentedAssetProcessor<T> {
1145
type Settings = T::Settings;
1146
type OutputLoader = T::OutputLoader;
1147
1148
fn process(
1149
&self,
1150
context: &mut ProcessContext,
1151
meta: AssetMeta<(), Self>,
1152
writer: &mut crate::io::Writer,
1153
) -> impl ConditionalSendFuture<
1154
Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
1155
> {
1156
// Change the processor type for the `AssetMeta`, which works because we share the `Settings` type.
1157
let meta = AssetMeta {
1158
meta_format_version: meta.meta_format_version,
1159
processed_info: meta.processed_info,
1160
asset: meta.asset,
1161
};
1162
let span = info_span!(
1163
"asset processing",
1164
processor = core::any::type_name::<T>(),
1165
asset = context.path().to_string(),
1166
);
1167
self.0.process(context, meta, writer).instrument(span)
1168
}
1169
}
1170
1171
/// The (successful) result of processing an asset
1172
#[derive(Debug, Clone)]
1173
pub enum ProcessResult {
1174
Processed(ProcessedInfo),
1175
SkippedNotChanged,
1176
Ignored,
1177
}
1178
1179
/// The final status of processing an asset
1180
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1181
pub enum ProcessStatus {
1182
Processed,
1183
Failed,
1184
NonExistent,
1185
}
1186
1187
// NOTE: if you add new fields to this struct, make sure they are propagated (when relevant) in ProcessorAssetInfos::rename
1188
#[derive(Debug)]
1189
pub(crate) struct ProcessorAssetInfo {
1190
processed_info: Option<ProcessedInfo>,
1191
/// Paths of assets that depend on this asset when they are being processed.
1192
dependents: HashSet<AssetPath<'static>>,
1193
status: Option<ProcessStatus>,
1194
/// A lock that controls read/write access to processed asset files. The lock is shared for both the asset bytes and the meta bytes.
1195
/// _This lock must be locked whenever a read or write to processed assets occurs_
1196
/// There are scenarios where processed assets (and their metadata) are being read and written in multiple places at once:
1197
/// * when the processor is running in parallel with an app
1198
/// * when processing assets in parallel, the processor might read an asset's `process_dependencies` when processing new versions of those dependencies
1199
/// * this second scenario almost certainly isn't possible with the current implementation, but its worth protecting against
1200
///
1201
/// This lock defends against those scenarios by ensuring readers don't read while processed files are being written. And it ensures
1202
/// Because this lock is shared across meta and asset bytes, readers can ensure they don't read "old" versions of metadata with "new" asset data.
1203
pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1204
status_sender: async_broadcast::Sender<ProcessStatus>,
1205
status_receiver: async_broadcast::Receiver<ProcessStatus>,
1206
}
1207
1208
impl Default for ProcessorAssetInfo {
1209
fn default() -> Self {
1210
let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1211
// allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1212
// not block if there was older state present.
1213
status_sender.set_overflow(true);
1214
Self {
1215
processed_info: Default::default(),
1216
dependents: Default::default(),
1217
file_transaction_lock: Default::default(),
1218
status: None,
1219
status_sender,
1220
status_receiver,
1221
}
1222
}
1223
}
1224
1225
impl ProcessorAssetInfo {
1226
async fn update_status(&mut self, status: ProcessStatus) {
1227
if self.status != Some(status) {
1228
self.status = Some(status);
1229
self.status_sender.broadcast(status).await.unwrap();
1230
}
1231
}
1232
}
1233
1234
/// The "current" in memory view of the asset space. This is "eventually consistent". It does not directly
1235
/// represent the state of assets in storage, but rather a valid historical view that will gradually become more
1236
/// consistent as events are processed.
1237
#[derive(Default, Debug)]
1238
pub struct ProcessorAssetInfos {
1239
/// The "current" in memory view of the asset space. During processing, if path does not exist in this, it should
1240
/// be considered non-existent.
1241
/// NOTE: YOU MUST USE `Self::get_or_insert` or `Self::insert` TO ADD ITEMS TO THIS COLLECTION TO ENSURE
1242
/// `non_existent_dependents` DATA IS CONSUMED
1243
infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1244
/// Dependents for assets that don't exist. This exists to track "dangling" asset references due to deleted / missing files.
1245
/// If the dependent asset is added, it can "resolve" these dependencies and re-compute those assets.
1246
/// Therefore this _must_ always be consistent with the `infos` data. If a new asset is added to `infos`, it should
1247
/// check this maps for dependencies and add them. If an asset is removed, it should update the dependents here.
1248
non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1249
check_reprocess_queue: VecDeque<AssetPath<'static>>,
1250
}
1251
1252
impl ProcessorAssetInfos {
1253
fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1254
self.infos.entry(asset_path.clone()).or_insert_with(|| {
1255
let mut info = ProcessorAssetInfo::default();
1256
// track existing dependents by resolving existing "hanging" dependents.
1257
if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1258
info.dependents = dependents;
1259
}
1260
info
1261
})
1262
}
1263
1264
pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1265
self.infos.get(asset_path)
1266
}
1267
1268
fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1269
self.infos.get_mut(asset_path)
1270
}
1271
1272
fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1273
if let Some(info) = self.get_mut(asset_path) {
1274
info.dependents.insert(dependent);
1275
} else {
1276
let dependents = self
1277
.non_existent_dependents
1278
.entry(asset_path.clone())
1279
.or_default();
1280
dependents.insert(dependent);
1281
}
1282
}
1283
1284
/// Finalize processing the asset, which will incorporate the result of the processed asset into the in-memory view the processed assets.
1285
async fn finish_processing(
1286
&mut self,
1287
asset_path: AssetPath<'static>,
1288
result: Result<ProcessResult, ProcessError>,
1289
) {
1290
match result {
1291
Ok(ProcessResult::Processed(processed_info)) => {
1292
debug!("Finished processing \"{}\"", asset_path);
1293
// clean up old dependents
1294
let old_processed_info = self
1295
.infos
1296
.get_mut(&asset_path)
1297
.and_then(|i| i.processed_info.take());
1298
if let Some(old_processed_info) = old_processed_info {
1299
self.clear_dependencies(&asset_path, old_processed_info);
1300
}
1301
1302
// populate new dependents
1303
for process_dependency_info in &processed_info.process_dependencies {
1304
self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1305
}
1306
let info = self.get_or_insert(asset_path);
1307
info.processed_info = Some(processed_info);
1308
info.update_status(ProcessStatus::Processed).await;
1309
let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1310
for path in dependents {
1311
self.check_reprocess_queue.push_back(path);
1312
}
1313
}
1314
Ok(ProcessResult::SkippedNotChanged) => {
1315
debug!("Skipping processing (unchanged) \"{}\"", asset_path);
1316
let info = self.get_mut(&asset_path).expect("info should exist");
1317
// NOTE: skipping an asset on a given pass doesn't mean it won't change in the future as a result
1318
// of a dependency being re-processed. This means apps might receive an "old" (but valid) asset first.
1319
// This is in the interest of fast startup times that don't block for all assets being checked + reprocessed
1320
// Therefore this relies on hot-reloading in the app to pickup the "latest" version of the asset
1321
// If "block until latest state is reflected" is required, we can easily add a less granular
1322
// "block until first pass finished" mode
1323
info.update_status(ProcessStatus::Processed).await;
1324
}
1325
Ok(ProcessResult::Ignored) => {
1326
debug!("Skipping processing (ignored) \"{}\"", asset_path);
1327
}
1328
Err(ProcessError::ExtensionRequired) => {
1329
// Skip assets without extensions
1330
}
1331
Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1332
trace!("No loader found for {asset_path}");
1333
}
1334
Err(ProcessError::AssetReaderError {
1335
err: AssetReaderError::NotFound(_),
1336
..
1337
}) => {
1338
// if there is no asset source, no processing can be done
1339
trace!("No need to process asset {asset_path} because it does not exist");
1340
}
1341
Err(err) => {
1342
error!("Failed to process asset {asset_path}: {err}");
1343
// if this failed because a dependency could not be loaded, make sure it is reprocessed if that dependency is reprocessed
1344
if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1345
err
1346
{
1347
let info = self.get_mut(&asset_path).expect("info should exist");
1348
info.processed_info = Some(ProcessedInfo {
1349
hash: AssetHash::default(),
1350
full_hash: AssetHash::default(),
1351
process_dependencies: vec![],
1352
});
1353
self.add_dependent(dependency.path(), asset_path.to_owned());
1354
}
1355
1356
let info = self.get_mut(&asset_path).expect("info should exist");
1357
info.update_status(ProcessStatus::Failed).await;
1358
}
1359
}
1360
}
1361
1362
/// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1363
async fn remove(&mut self, asset_path: &AssetPath<'static>) {
1364
let info = self.infos.remove(asset_path);
1365
if let Some(info) = info {
1366
if let Some(processed_info) = info.processed_info {
1367
self.clear_dependencies(asset_path, processed_info);
1368
}
1369
// Tell all listeners this asset does not exist
1370
info.status_sender
1371
.broadcast(ProcessStatus::NonExistent)
1372
.await
1373
.unwrap();
1374
if !info.dependents.is_empty() {
1375
error!(
1376
"The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1377
info.dependents
1378
);
1379
self.non_existent_dependents
1380
.insert(asset_path.clone(), info.dependents);
1381
}
1382
}
1383
}
1384
1385
/// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1386
async fn rename(&mut self, old: &AssetPath<'static>, new: &AssetPath<'static>) {
1387
let info = self.infos.remove(old);
1388
if let Some(mut info) = info {
1389
if !info.dependents.is_empty() {
1390
// TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
1391
// doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
1392
// we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
1393
// If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
1394
// If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
1395
// TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
1396
// (see the remove impl).
1397
error!(
1398
"The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1399
info.dependents
1400
);
1401
self.non_existent_dependents
1402
.insert(old.clone(), core::mem::take(&mut info.dependents));
1403
}
1404
if let Some(processed_info) = &info.processed_info {
1405
// Update "dependent" lists for this asset's "process dependencies" to use new path.
1406
for dep in &processed_info.process_dependencies {
1407
if let Some(info) = self.infos.get_mut(&dep.path) {
1408
info.dependents.remove(old);
1409
info.dependents.insert(new.clone());
1410
} else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path)
1411
{
1412
dependents.remove(old);
1413
dependents.insert(new.clone());
1414
}
1415
}
1416
}
1417
// Tell all listeners this asset no longer exists
1418
info.status_sender
1419
.broadcast(ProcessStatus::NonExistent)
1420
.await
1421
.unwrap();
1422
let dependents: Vec<AssetPath<'static>> = {
1423
let new_info = self.get_or_insert(new.clone());
1424
new_info.processed_info = info.processed_info;
1425
new_info.status = info.status;
1426
// Ensure things waiting on the new path are informed of the status of this asset
1427
if let Some(status) = new_info.status {
1428
new_info.status_sender.broadcast(status).await.unwrap();
1429
}
1430
new_info.dependents.iter().cloned().collect()
1431
};
1432
// Queue the asset for a reprocess check, in case it needs new meta.
1433
self.check_reprocess_queue.push_back(new.clone());
1434
for dependent in dependents {
1435
// Queue dependents for reprocessing because they might have been waiting for this asset.
1436
self.check_reprocess_queue.push_back(dependent);
1437
}
1438
}
1439
}
1440
1441
fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1442
for old_load_dep in removed_info.process_dependencies {
1443
if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1444
info.dependents.remove(asset_path);
1445
} else if let Some(dependents) =
1446
self.non_existent_dependents.get_mut(&old_load_dep.path)
1447
{
1448
dependents.remove(asset_path);
1449
}
1450
}
1451
}
1452
}
1453
1454
/// The current state of the [`AssetProcessor`].
1455
#[derive(Copy, Clone, PartialEq, Eq)]
1456
pub enum ProcessorState {
1457
/// The processor is still initializing, which involves scanning the current asset folders,
1458
/// constructing an in-memory view of the asset space, recovering from previous errors / crashes,
1459
/// and cleaning up old / unused assets.
1460
Initializing,
1461
/// The processor is currently processing assets.
1462
Processing,
1463
/// The processor has finished processing all valid assets and reporting invalid assets.
1464
Finished,
1465
}
1466
1467
/// An error that occurs when initializing the [`AssetProcessor`].
1468
#[derive(Error, Debug)]
1469
pub enum InitializeError {
1470
#[error(transparent)]
1471
FailedToReadSourcePaths(AssetReaderError),
1472
#[error(transparent)]
1473
FailedToReadDestinationPaths(AssetReaderError),
1474
#[error("Failed to validate asset log: {0}")]
1475
ValidateLogError(#[from] ValidateLogError),
1476
}
1477
1478