Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_asset/src/processor/mod.rs
9349 views
1
//! Asset processing in Bevy is a framework for automatically transforming artist-authored assets into the format that best suits the needs of your particular game.
2
//!
3
//! You can think of the asset processing system as a "build system" for assets.
4
//! When an artist adds a new asset to the project or an asset is changed (assuming asset hot reloading is enabled), the asset processing system will automatically perform the specified processing steps on the asset.
5
//! This can include things like creating lightmaps for baked lighting, compressing a `.wav` file to an `.ogg`, or generating mipmaps for a texture.
6
//!
7
//! Its core values are:
8
//!
9
//! 1. Automatic: new and changed assets should be ready to use in-game without requiring any manual conversion or cleanup steps.
10
//! 2. Configurable: every game has its own needs, and a high level of transparency and control is required.
11
//! 3. Lossless: the original asset should always be preserved, ensuring artists can make changes later.
12
//! 4. Deterministic: performing the same processing steps on the same asset should (generally) produce the exact same result. In cases where this doesn't make sense (steps that involve a degree of randomness or uncertainty), the results across runs should be "acceptably similar", as they will be generated once for a given set of inputs and cached.
13
//!
14
//! Taken together, this means that the original asset plus the processing steps should be enough to regenerate the final asset.
15
//! While it may be possible to manually edit the final asset, this should be discouraged.
16
//! Final post-processed assets should generally not be version-controlled, except to save developer time when recomputing heavy asset processing steps.
17
//!
18
//! # Usage
19
//!
20
//! Asset processing can be enabled or disabled in [`AssetPlugin`](crate::AssetPlugin) by setting the [`AssetMode`](crate::AssetMode).\
21
//! Enable Bevy's `file_watcher` feature to automatically watch for changes to assets and reprocess them.
22
//!
23
//! To register a new asset processor, use [`AssetProcessor::register_processor`].
24
//! To set the default asset processor for a given extension, use [`AssetProcessor::set_default_processor`].
25
//! In most cases, these methods will be called directly on [`App`](bevy_app::App) using the [`AssetApp`](crate::AssetApp) extension trait.
26
//!
27
//! If a default asset processor is set, assets with a matching extension will be processed using that processor before loading.
28
//!
29
//! For an end-to-end example, check out the examples in the [`examples/asset/processing`](https://github.com/bevyengine/bevy/tree/latest/examples/asset/processing) directory of the Bevy repository.
30
//!
31
//! # Defining asset processors
32
//!
33
//! Bevy provides two different ways to define new asset processors:
34
//!
35
//! - [`LoadTransformAndSave`] + [`AssetTransformer`](crate::transformer::AssetTransformer): a high-level API for loading, transforming, and saving assets.
36
//! - [`Process`]: a flexible low-level API for processing assets in arbitrary ways.
37
//!
38
//! In most cases, [`LoadTransformAndSave`] should be sufficient.
39
40
mod log;
41
mod process;
42
43
use async_lock::RwLockReadGuardArc;
44
pub use log::*;
45
pub use process::*;
46
47
use crate::{
48
io::{
49
AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
50
AssetSources, AssetWriterError, ErasedAssetReader, MissingAssetSourceError,
51
},
52
meta::{
53
get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54
AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55
},
56
AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57
MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
58
};
59
use alloc::{borrow::ToOwned, boxed::Box, string::String, sync::Arc, vec, vec::Vec};
60
use bevy_ecs::prelude::*;
61
use bevy_platform::{
62
collections::{hash_map::Entry, HashMap, HashSet},
63
sync::{PoisonError, RwLock},
64
};
65
use bevy_tasks::IoTaskPool;
66
use futures_io::ErrorKind;
67
use futures_lite::{AsyncWriteExt, StreamExt};
68
use futures_util::{select_biased, FutureExt};
69
use std::{
70
path::{Path, PathBuf},
71
sync::Mutex,
72
};
73
use thiserror::Error;
74
use tracing::{debug, error, trace, warn};
75
76
#[cfg(feature = "trace")]
77
use {
78
alloc::string::ToString,
79
tracing::{info_span, instrument::Instrument},
80
};
81
82
/// A "background" asset processor that reads asset values from a source [`AssetSource`] (which corresponds to an [`AssetReader`](crate::io::AssetReader) / [`AssetWriter`](crate::io::AssetWriter) pair),
83
/// processes them in some way, and writes them to a destination [`AssetSource`].
84
///
85
/// This will create .meta files (a human-editable serialized form of [`AssetMeta`]) in the source [`AssetSource`] for assets
86
/// that can be loaded and/or processed. This enables developers to configure how each asset should be loaded and/or processed.
87
///
88
/// [`AssetProcessor`] can be run in the background while a Bevy App is running. Changes to assets will be automatically detected and hot-reloaded.
89
///
90
/// Assets will only be re-processed if they have been changed. A hash of each asset source is stored in the metadata of the processed version of the
91
/// asset, which is used to determine if the asset source has actually changed.
92
///
93
/// A [`ProcessorTransactionLog`] is produced, which uses "write-ahead logging" to make the [`AssetProcessor`] crash and failure resistant. If a failed/unfinished
94
/// transaction from a previous run is detected, the affected asset(s) will be re-processed.
95
///
96
/// [`AssetProcessor`] can be cloned. It is backed by an [`Arc`] so clones will share state. Clones can be freely used in parallel.
97
#[derive(Resource, Clone)]
98
pub struct AssetProcessor {
99
server: AssetServer,
100
pub(crate) data: Arc<AssetProcessorData>,
101
}
102
103
/// Internal data stored inside an [`AssetProcessor`].
104
pub struct AssetProcessorData {
105
/// The state of processing.
106
pub(crate) processing_state: Arc<ProcessingState>,
107
/// The factory that creates the transaction log.
108
///
109
/// Note: we use a regular Mutex instead of an async mutex since we expect users to only set
110
/// this once, and before the asset processor starts - there is no reason to await (and it
111
/// avoids needing to use [`block_on`](bevy_tasks::block_on) to set the factory).
112
log_factory: Mutex<Option<Box<dyn ProcessorTransactionLogFactory>>>,
113
log: async_lock::RwLock<Option<Box<dyn ProcessorTransactionLog>>>,
114
/// The processors that will be used to process assets.
115
processors: RwLock<Processors>,
116
sources: Arc<AssetSources>,
117
}
118
119
/// The current state of processing, including the overall state and the state of all assets.
120
pub(crate) struct ProcessingState {
121
/// The overall state of processing.
122
state: async_lock::RwLock<ProcessorState>,
123
/// The channel to broadcast when the processor has completed initialization.
124
initialized_sender: async_broadcast::Sender<()>,
125
initialized_receiver: async_broadcast::Receiver<()>,
126
/// The channel to broadcast when the processor has completed processing.
127
finished_sender: async_broadcast::Sender<()>,
128
finished_receiver: async_broadcast::Receiver<()>,
129
/// The current state of the assets.
130
asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
131
}
132
133
#[derive(Default)]
134
struct Processors {
135
/// Maps the type path of the processor to its instance.
136
type_path_to_processor: HashMap<&'static str, Arc<dyn ErasedProcessor>>,
137
/// Maps the short type path of the processor to its instance.
138
short_type_path_to_processor: HashMap<&'static str, ShortTypeProcessorEntry>,
139
/// Maps the file extension of an asset to the type path of the processor we should use to
140
/// process it by default.
141
file_extension_to_default_processor: HashMap<Box<str>, &'static str>,
142
}
143
144
enum ShortTypeProcessorEntry {
145
/// There is a unique processor with the given short type path.
146
Unique {
147
/// The full type path of the processor.
148
type_path: &'static str,
149
/// The processor itself.
150
processor: Arc<dyn ErasedProcessor>,
151
},
152
/// There are (at least) two processors with the same short type path (storing the full type
153
/// paths of all conflicting processors). Users must fully specify the type path in order to
154
/// disambiguate.
155
Ambiguous(Vec<&'static str>),
156
}
157
158
impl AssetProcessor {
159
/// Creates a new [`AssetProcessor`] instance.
160
pub fn new(
161
sources: &mut AssetSourceBuilders,
162
watch_processed: bool,
163
) -> (Self, Arc<AssetSources>) {
164
let state = Arc::new(ProcessingState::new());
165
let mut sources = sources.build_sources(true, watch_processed);
166
sources.gate_on_processor(state.clone());
167
let sources = Arc::new(sources);
168
169
let data = Arc::new(AssetProcessorData::new(sources.clone(), state));
170
// The asset processor uses its own asset server with its own id space
171
let server = AssetServer::new_with_meta_check(
172
sources.clone(),
173
AssetServerMode::Processed,
174
AssetMetaCheck::Always,
175
false,
176
UnapprovedPathMode::default(),
177
);
178
(Self { server, data }, sources)
179
}
180
181
/// Gets a reference to the [`Arc`] containing the [`AssetProcessorData`].
182
pub fn data(&self) -> &Arc<AssetProcessorData> {
183
&self.data
184
}
185
186
/// The "internal" [`AssetServer`] used by the [`AssetProcessor`]. This is _separate_ from the asset processor used by
187
/// the main App. It has different processor-specific configuration and a different ID space.
188
pub fn server(&self) -> &AssetServer {
189
&self.server
190
}
191
192
/// Retrieves the current [`ProcessorState`]
193
pub async fn get_state(&self) -> ProcessorState {
194
self.data.processing_state.get_state().await
195
}
196
197
/// Retrieves the [`AssetSource`] for this processor
198
#[inline]
199
pub fn get_source<'a>(
200
&self,
201
id: impl Into<AssetSourceId<'a>>,
202
) -> Result<&AssetSource, MissingAssetSourceError> {
203
self.data.sources.get(id.into())
204
}
205
206
#[inline]
207
pub fn sources(&self) -> &AssetSources {
208
&self.data.sources
209
}
210
211
/// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
212
/// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
213
async fn log_unrecoverable(&self) {
214
let mut log = self.data.log.write().await;
215
let log = log.as_mut().unwrap();
216
log.unrecoverable()
217
.await
218
.map_err(|error| WriteLogError {
219
log_entry: LogEntry::UnrecoverableError,
220
error,
221
})
222
.unwrap();
223
}
224
225
/// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`AssetProcessor::log_end_processing`],
226
/// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
227
async fn log_begin_processing(&self, path: &AssetPath<'_>) {
228
let mut log = self.data.log.write().await;
229
let log = log.as_mut().unwrap();
230
log.begin_processing(path)
231
.await
232
.map_err(|error| WriteLogError {
233
log_entry: LogEntry::BeginProcessing(path.clone_owned()),
234
error,
235
})
236
.unwrap();
237
}
238
239
/// Logs the end of an asset being successfully processed. See [`AssetProcessor::log_begin_processing`].
240
async fn log_end_processing(&self, path: &AssetPath<'_>) {
241
let mut log = self.data.log.write().await;
242
let log = log.as_mut().unwrap();
243
log.end_processing(path)
244
.await
245
.map_err(|error| WriteLogError {
246
log_entry: LogEntry::EndProcessing(path.clone_owned()),
247
error,
248
})
249
.unwrap();
250
}
251
252
/// Starts the processor in a background thread.
253
pub fn start(processor: Res<Self>) {
254
let processor = processor.clone();
255
IoTaskPool::get()
256
.spawn(async move {
257
let start_time = std::time::Instant::now();
258
debug!("Processing Assets");
259
260
processor.initialize().await.unwrap();
261
262
let (new_task_sender, new_task_receiver) = async_channel::unbounded();
263
processor
264
.queue_initial_processing_tasks(&new_task_sender)
265
.await;
266
267
// Once all the tasks are queued for the initial processing, start actually
268
// executing the tasks.
269
{
270
let processor = processor.clone();
271
let new_task_sender = new_task_sender.clone();
272
IoTaskPool::get()
273
.spawn(async move {
274
processor
275
.execute_processing_tasks(new_task_sender, new_task_receiver)
276
.await;
277
})
278
.detach();
279
}
280
281
processor.data.wait_until_finished().await;
282
283
let end_time = std::time::Instant::now();
284
debug!("Processing finished in {:?}", end_time - start_time);
285
286
debug!("Listening for changes to source assets");
287
processor.spawn_source_change_event_listeners(&new_task_sender);
288
})
289
.detach();
290
}
291
292
/// Sends start task events for all assets in all processed sources into `sender`.
293
async fn queue_initial_processing_tasks(
294
&self,
295
sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
296
) {
297
for source in self.sources().iter_processed() {
298
self.queue_processing_tasks_for_folder(source, PathBuf::from(""), sender)
299
.await
300
.unwrap();
301
}
302
}
303
304
/// Spawns listeners of change events for all asset sources which will start processor tasks in
305
/// response.
306
fn spawn_source_change_event_listeners(
307
&self,
308
sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
309
) {
310
for source in self.data.sources.iter_processed() {
311
let Some(receiver) = source.event_receiver().cloned() else {
312
continue;
313
};
314
let source_id = source.id();
315
let processor = self.clone();
316
let sender = sender.clone();
317
IoTaskPool::get()
318
.spawn(async move {
319
while let Ok(event) = receiver.recv().await {
320
let Ok(source) = processor.get_source(source_id.clone()) else {
321
return;
322
};
323
processor
324
.handle_asset_source_event(source, event, &sender)
325
.await;
326
}
327
})
328
.detach();
329
}
330
}
331
332
/// Executes all tasks that come through `receiver`, and updates the processor's overall state
333
/// based on task starts and ends.
334
///
335
/// This future does not terminate until the channel is closed (not when the channel is empty).
336
/// This means that in [`AssetProcessor::start`], this execution will continue even after all
337
/// the initial tasks are processed.
338
async fn execute_processing_tasks(
339
&self,
340
new_task_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
341
new_task_receiver: async_channel::Receiver<(AssetSourceId<'static>, PathBuf)>,
342
) {
343
// Convert the Sender into a WeakSender so that once all task producers terminate (and drop
344
// their sender), this task doesn't keep itself alive. We still however need a way to get
345
// the sender since processing tasks can start the tasks of dependent assets.
346
let new_task_sender = {
347
let weak_sender = new_task_sender.downgrade();
348
drop(new_task_sender);
349
weak_sender
350
};
351
352
// If there aren't any tasks in the channel the first time around, we should immediately go
353
// to the finished state (otherwise we'd be sitting around stuck in the `Initialized`
354
// state).
355
if new_task_receiver.is_empty() {
356
self.data
357
.processing_state
358
.set_state(ProcessorState::Finished)
359
.await;
360
}
361
enum ProcessorTaskEvent {
362
Start(AssetSourceId<'static>, PathBuf),
363
Finished,
364
}
365
let (task_finished_sender, task_finished_receiver) = async_channel::unbounded::<()>();
366
367
let mut pending_tasks = 0;
368
while let Ok(event) = {
369
// It's ok to use `select_biased` since we prefer to start task rather than finish tasks
370
// anyway - since otherwise we might mark the processor as finished before all queued
371
// tasks are done. `select_biased` also doesn't depend on `std` which is nice!
372
select_biased! {
373
result = new_task_receiver.recv().fuse() => {
374
result.map(|(source_id, path)| ProcessorTaskEvent::Start(source_id, path))
375
},
376
result = task_finished_receiver.recv().fuse() => {
377
result.map(|()| ProcessorTaskEvent::Finished)
378
}
379
}
380
} {
381
match event {
382
ProcessorTaskEvent::Start(source_id, path) => {
383
let Some(new_task_sender) = new_task_sender.upgrade() else {
384
// If we can't upgrade the task sender, that means all sources of tasks
385
// (like the source event listeners) have been dropped. That means that the
386
// sources are no longer in the app, so reading/writing to them will
387
// probably not work, so ignoring the task is fine. This also likely means
388
// that the whole app is being dropped, so we can recover on the next
389
// initialization.
390
continue;
391
};
392
let processor = self.clone();
393
let task_finished_sender = task_finished_sender.clone();
394
pending_tasks += 1;
395
IoTaskPool::get()
396
.spawn(async move {
397
let Ok(source) = processor.get_source(source_id) else {
398
return;
399
};
400
processor.process_asset(source, path, new_task_sender).await;
401
// If the channel gets closed, that's ok. Just ignore it.
402
let _ = task_finished_sender.send(()).await;
403
})
404
.detach();
405
self.data
406
.processing_state
407
.set_state(ProcessorState::Processing)
408
.await;
409
}
410
ProcessorTaskEvent::Finished => {
411
pending_tasks -= 1;
412
if pending_tasks == 0 {
413
// clean up metadata in asset server
414
self.server.write_infos().consume_handle_drop_events();
415
self.data
416
.processing_state
417
.set_state(ProcessorState::Finished)
418
.await;
419
}
420
}
421
}
422
}
423
}
424
425
/// Writes the default meta file for the provided `path`.
426
///
427
/// This function generates the appropriate meta file to process `path` with the default
428
/// processor. If there is no default processor, it falls back to the default loader.
429
///
430
/// Note if there is already a meta file for `path`, this function returns
431
/// `Err(WriteDefaultMetaError::MetaAlreadyExists)`.
432
pub async fn write_default_meta_file_for_path(
433
&self,
434
path: impl Into<AssetPath<'_>>,
435
) -> Result<(), WriteDefaultMetaError> {
436
let path = path.into();
437
let Some(processor) = path
438
.get_full_extension()
439
.and_then(|extension| self.get_default_processor(&extension))
440
else {
441
return self
442
.server
443
.write_default_loader_meta_file_for_path(path)
444
.await;
445
};
446
447
let meta = processor.default_meta();
448
let serialized_meta = meta.serialize();
449
450
let source = self.get_source(path.source())?;
451
452
// Note: we get the reader rather than the processed reader, since we want to write the meta
453
// file for the unprocessed version of that asset (so it will be processed by the default
454
// processor).
455
let reader = source.reader();
456
match reader.read_meta_bytes(path.path()).await {
457
Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
458
Err(AssetReaderError::NotFound(_)) => {
459
// The meta file couldn't be found so just fall through.
460
}
461
Err(AssetReaderError::Io(err)) => {
462
return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
463
}
464
Err(AssetReaderError::HttpError(err)) => {
465
return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
466
}
467
}
468
469
let writer = source.writer()?;
470
writer
471
.write_meta_bytes(path.path(), &serialized_meta)
472
.await?;
473
474
Ok(())
475
}
476
477
async fn handle_asset_source_event(
478
&self,
479
source: &AssetSource,
480
event: AssetSourceEvent,
481
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
482
) {
483
trace!("{event:?}");
484
match event {
485
AssetSourceEvent::AddedAsset(path)
486
| AssetSourceEvent::AddedMeta(path)
487
| AssetSourceEvent::ModifiedAsset(path)
488
| AssetSourceEvent::ModifiedMeta(path) => {
489
let _ = new_task_sender.send((source.id(), path)).await;
490
}
491
AssetSourceEvent::RemovedAsset(path) => {
492
self.handle_removed_asset(source, path).await;
493
}
494
AssetSourceEvent::RemovedMeta(path) => {
495
self.handle_removed_meta(source, path, new_task_sender)
496
.await;
497
}
498
AssetSourceEvent::AddedFolder(path) => {
499
self.handle_added_folder(source, path, new_task_sender)
500
.await;
501
}
502
// NOTE: As a heads up for future devs: this event shouldn't be run in parallel with other events that might
503
// touch this folder (ex: the folder might be re-created with new assets). Clean up the old state first.
504
// Currently this event handler is not parallel, but it could be (and likely should be) in the future.
505
AssetSourceEvent::RemovedFolder(path) => {
506
self.handle_removed_folder(source, &path).await;
507
}
508
AssetSourceEvent::RenamedAsset { old, new } => {
509
// If there was a rename event, but the path hasn't changed, this asset might need reprocessing.
510
// Sometimes this event is returned when an asset is moved "back" into the asset folder
511
if old == new {
512
let _ = new_task_sender.send((source.id(), new)).await;
513
} else {
514
self.handle_renamed_asset(source, old, new, new_task_sender)
515
.await;
516
}
517
}
518
AssetSourceEvent::RenamedMeta { old, new } => {
519
// If there was a rename event, but the path hasn't changed, this asset meta might need reprocessing.
520
// Sometimes this event is returned when an asset meta is moved "back" into the asset folder
521
if old == new {
522
let _ = new_task_sender.send((source.id(), new)).await;
523
} else {
524
debug!("Meta renamed from {old:?} to {new:?}");
525
// Renaming meta should not assume that an asset has also been renamed. Check both old and new assets to see
526
// if they should be re-imported (and/or have new meta generated)
527
let _ = new_task_sender.send((source.id(), old)).await;
528
let _ = new_task_sender.send((source.id(), new)).await;
529
}
530
}
531
AssetSourceEvent::RenamedFolder { old, new } => {
532
// If there was a rename event, but the path hasn't changed, this asset folder might need reprocessing.
533
// Sometimes this event is returned when an asset meta is moved "back" into the asset folder
534
if old == new {
535
self.handle_added_folder(source, new, new_task_sender).await;
536
} else {
537
// PERF: this reprocesses everything in the moved folder. this is not necessary in most cases, but
538
// requires some nuance when it comes to path handling.
539
self.handle_removed_folder(source, &old).await;
540
self.handle_added_folder(source, new, new_task_sender).await;
541
}
542
}
543
AssetSourceEvent::RemovedUnknown { path, is_meta } => {
544
let processed_reader = source.ungated_processed_reader().unwrap();
545
match processed_reader.is_directory(&path).await {
546
Ok(is_directory) => {
547
if is_directory {
548
self.handle_removed_folder(source, &path).await;
549
} else if is_meta {
550
self.handle_removed_meta(source, path, new_task_sender)
551
.await;
552
} else {
553
self.handle_removed_asset(source, path).await;
554
}
555
}
556
Err(err) => {
557
match err {
558
AssetReaderError::NotFound(_) => {
559
// if the path is not found, a processed version does not exist
560
}
561
AssetReaderError::Io(err) => {
562
error!(
563
"Path '{}' was removed, but the destination reader could not determine if it \
564
was a folder or a file due to the following error: {err}",
565
AssetPath::from_path(&path).with_source(source.id())
566
);
567
}
568
AssetReaderError::HttpError(status) => {
569
error!(
570
"Path '{}' was removed, but the destination reader could not determine if it \
571
was a folder or a file due to receiving an unexpected HTTP Status {status}",
572
AssetPath::from_path(&path).with_source(source.id())
573
);
574
}
575
}
576
}
577
}
578
}
579
}
580
}
581
582
async fn handle_added_folder(
583
&self,
584
source: &AssetSource,
585
path: PathBuf,
586
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
587
) {
588
debug!(
589
"Folder {} was added. Attempting to re-process",
590
AssetPath::from_path(&path).with_source(source.id())
591
);
592
self.queue_processing_tasks_for_folder(source, path, new_task_sender)
593
.await
594
.unwrap();
595
}
596
597
/// Responds to a removed meta event by reprocessing the asset at the given path.
598
async fn handle_removed_meta(
599
&self,
600
source: &AssetSource,
601
path: PathBuf,
602
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
603
) {
604
// If meta was removed, we might need to regenerate it.
605
// Likewise, the user might be manually re-adding the asset.
606
// Therefore, we shouldn't automatically delete the asset ... that is a
607
// user-initiated action.
608
debug!(
609
"Meta for asset {} was removed. Attempting to re-process",
610
AssetPath::from_path(&path).with_source(source.id())
611
);
612
let _ = new_task_sender.send((source.id(), path)).await;
613
}
614
615
/// Removes all processed assets stored at the given path (respecting transactionality), then removes the folder itself.
616
async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
617
debug!(
618
"Removing folder {} because source was removed",
619
path.display()
620
);
621
let processed_reader = source.ungated_processed_reader().unwrap();
622
match processed_reader.read_directory(path).await {
623
Ok(mut path_stream) => {
624
while let Some(child_path) = path_stream.next().await {
625
self.handle_removed_asset(source, child_path).await;
626
}
627
}
628
Err(err) => match err {
629
AssetReaderError::NotFound(_err) => {
630
// The processed folder does not exist. No need to update anything
631
}
632
AssetReaderError::HttpError(status) => {
633
self.log_unrecoverable().await;
634
error!(
635
"Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
636
in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
637
);
638
}
639
AssetReaderError::Io(err) => {
640
self.log_unrecoverable().await;
641
error!(
642
"Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
643
in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
644
);
645
}
646
},
647
}
648
let processed_writer = source.processed_writer().unwrap();
649
if let Err(err) = processed_writer.remove_directory(path).await {
650
match err {
651
AssetWriterError::Io(err) => {
652
// we can ignore NotFound because if the "final" file in a folder was removed
653
// then we automatically clean up this folder
654
if err.kind() != ErrorKind::NotFound {
655
let asset_path = AssetPath::from_path(path).with_source(source.id());
656
error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
657
}
658
}
659
}
660
}
661
}
662
663
/// Removes the processed version of an asset and associated in-memory metadata. This will block until all existing reads/writes to the
664
/// asset have finished, thanks to the `file_transaction_lock`.
665
async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
666
let asset_path = AssetPath::from(path).with_source(source.id());
667
debug!("Removing processed {asset_path} because source was removed");
668
let lock = {
669
// Scope the infos lock so we don't hold up other processing for too long.
670
let mut infos = self.data.processing_state.asset_infos.write().await;
671
infos.remove(&asset_path).await
672
};
673
let Some(lock) = lock else {
674
return;
675
};
676
677
// we must wait for uncontested write access to the asset source to ensure existing
678
// readers/writers can finish their operations
679
let _write_lock = lock.write();
680
self.remove_processed_asset_and_meta(source, asset_path.path())
681
.await;
682
}
683
684
/// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata.
685
/// This will cause direct path dependencies to break.
686
async fn handle_renamed_asset(
687
&self,
688
source: &AssetSource,
689
old: PathBuf,
690
new: PathBuf,
691
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
692
) {
693
let old = AssetPath::from(old).with_source(source.id());
694
let new = AssetPath::from(new).with_source(source.id());
695
let processed_writer = source.processed_writer().unwrap();
696
let result = {
697
// Scope the infos lock so we don't hold up other processing for too long.
698
let mut infos = self.data.processing_state.asset_infos.write().await;
699
infos.rename(&old, &new, new_task_sender).await
700
};
701
let Some((old_lock, new_lock)) = result else {
702
return;
703
};
704
// we must wait for uncontested write access to both assets to ensure existing
705
// readers/writers can finish their operations
706
let _old_write_lock = old_lock.write();
707
let _new_write_lock = new_lock.write();
708
processed_writer
709
.rename(old.path(), new.path())
710
.await
711
.unwrap();
712
processed_writer
713
.rename_meta(old.path(), new.path())
714
.await
715
.unwrap();
716
}
717
718
async fn queue_processing_tasks_for_folder(
719
&self,
720
source: &AssetSource,
721
path: PathBuf,
722
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
723
) -> Result<(), AssetReaderError> {
724
if source.reader().is_directory(&path).await? {
725
let mut path_stream = source.reader().read_directory(&path).await?;
726
while let Some(path) = path_stream.next().await {
727
Box::pin(self.queue_processing_tasks_for_folder(source, path, new_task_sender))
728
.await?;
729
}
730
} else {
731
let _ = new_task_sender.send((source.id(), path)).await;
732
}
733
Ok(())
734
}
735
736
/// Register a new asset processor.
737
pub fn register_processor<P: Process>(&self, processor: P) {
738
let mut processors = self
739
.data
740
.processors
741
.write()
742
.unwrap_or_else(PoisonError::into_inner);
743
let processor = Arc::new(processor);
744
processors
745
.type_path_to_processor
746
.insert(P::type_path(), processor.clone());
747
match processors
748
.short_type_path_to_processor
749
.entry(P::short_type_path())
750
{
751
Entry::Vacant(entry) => {
752
entry.insert(ShortTypeProcessorEntry::Unique {
753
type_path: P::type_path(),
754
processor,
755
});
756
}
757
Entry::Occupied(mut entry) => match entry.get_mut() {
758
ShortTypeProcessorEntry::Unique { type_path, .. } => {
759
let type_path = *type_path;
760
*entry.get_mut() =
761
ShortTypeProcessorEntry::Ambiguous(vec![type_path, P::type_path()]);
762
}
763
ShortTypeProcessorEntry::Ambiguous(type_paths) => {
764
type_paths.push(P::type_path());
765
}
766
},
767
}
768
}
769
770
/// Set the default processor for the given `extension`. Make sure `P` is registered with [`AssetProcessor::register_processor`].
771
pub fn set_default_processor<P: Process>(&self, extension: &str) {
772
let mut processors = self
773
.data
774
.processors
775
.write()
776
.unwrap_or_else(PoisonError::into_inner);
777
processors
778
.file_extension_to_default_processor
779
.insert(extension.into(), P::type_path());
780
}
781
782
/// Returns the default processor for the given `extension`, if it exists.
783
pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
784
let processors = self
785
.data
786
.processors
787
.read()
788
.unwrap_or_else(PoisonError::into_inner);
789
let key = processors
790
.file_extension_to_default_processor
791
.get(extension)?;
792
processors.type_path_to_processor.get(key).cloned()
793
}
794
795
/// Returns the processor with the given `processor_type_name`, if it exists.
796
pub fn get_processor(
797
&self,
798
processor_type_name: &str,
799
) -> Result<Arc<dyn ErasedProcessor>, GetProcessorError> {
800
let processors = self
801
.data
802
.processors
803
.read()
804
.unwrap_or_else(PoisonError::into_inner);
805
if let Some(short_type_processor) = processors
806
.short_type_path_to_processor
807
.get(processor_type_name)
808
{
809
return match short_type_processor {
810
ShortTypeProcessorEntry::Unique { processor, .. } => Ok(processor.clone()),
811
ShortTypeProcessorEntry::Ambiguous(examples) => Err(GetProcessorError::Ambiguous {
812
processor_short_name: processor_type_name.to_owned(),
813
ambiguous_processor_names: examples.clone(),
814
}),
815
};
816
}
817
processors
818
.type_path_to_processor
819
.get(processor_type_name)
820
.cloned()
821
.ok_or_else(|| GetProcessorError::Missing(processor_type_name.to_owned()))
822
}
823
824
/// Populates the initial view of each asset by scanning the unprocessed and processed asset folders.
825
/// This info will later be used to determine whether or not to re-process an asset
826
///
827
/// This will validate transactions and recover failed transactions when necessary.
828
async fn initialize(&self) -> Result<(), InitializeError> {
829
self.validate_transaction_log_and_recover().await;
830
let mut asset_infos = self.data.processing_state.asset_infos.write().await;
831
832
/// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty
833
/// folders when they are discovered.
834
async fn get_asset_paths(
835
reader: &dyn ErasedAssetReader,
836
path: PathBuf,
837
paths: &mut Vec<PathBuf>,
838
mut empty_dirs: Option<&mut Vec<PathBuf>>,
839
) -> Result<bool, AssetReaderError> {
840
if reader.is_directory(&path).await? {
841
let mut path_stream = reader.read_directory(&path).await?;
842
let mut contains_files = false;
843
844
while let Some(child_path) = path_stream.next().await {
845
contains_files |= Box::pin(get_asset_paths(
846
reader,
847
child_path,
848
paths,
849
empty_dirs.as_deref_mut(),
850
))
851
.await?;
852
}
853
// Add the current directory after all its subdirectories so we delete any empty
854
// subdirectories before the current directory.
855
if !contains_files
856
&& path.parent().is_some()
857
&& let Some(empty_dirs) = empty_dirs
858
{
859
empty_dirs.push(path);
860
}
861
Ok(contains_files)
862
} else {
863
paths.push(path);
864
Ok(true)
865
}
866
}
867
868
for source in self.sources().iter_processed() {
869
let Some(processed_reader) = source.ungated_processed_reader() else {
870
continue;
871
};
872
let Ok(processed_writer) = source.processed_writer() else {
873
continue;
874
};
875
let mut unprocessed_paths = Vec::new();
876
get_asset_paths(
877
source.reader(),
878
PathBuf::from(""),
879
&mut unprocessed_paths,
880
None,
881
)
882
.await
883
.map_err(InitializeError::FailedToReadSourcePaths)?;
884
885
let mut processed_paths = Vec::new();
886
let mut empty_dirs = Vec::new();
887
get_asset_paths(
888
processed_reader,
889
PathBuf::from(""),
890
&mut processed_paths,
891
Some(&mut empty_dirs),
892
)
893
.await
894
.map_err(InitializeError::FailedToReadDestinationPaths)?;
895
896
// Remove any empty directories from the processed path. Note: this has to happen after
897
// we fetch all the paths, otherwise the path stream can skip over paths
898
// (we're modifying a collection while iterating through it).
899
for empty_dir in empty_dirs {
900
// We don't care if this succeeds, since it's just a cleanup task. It is best-effort
901
let _ = processed_writer.remove_empty_directory(&empty_dir).await;
902
}
903
904
for path in unprocessed_paths {
905
asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
906
}
907
908
for path in processed_paths {
909
let mut dependencies = Vec::new();
910
let asset_path = AssetPath::from(path).with_source(source.id());
911
if let Some(info) = asset_infos.get_mut(&asset_path) {
912
match processed_reader.read_meta_bytes(asset_path.path()).await {
913
Ok(meta_bytes) => {
914
match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
915
Ok(minimal) => {
916
trace!(
917
"Populated processed info for asset {asset_path} {:?}",
918
minimal.processed_info
919
);
920
921
if let Some(processed_info) = &minimal.processed_info {
922
for process_dependency_info in
923
&processed_info.process_dependencies
924
{
925
dependencies.push(process_dependency_info.path.clone());
926
}
927
}
928
info.processed_info = minimal.processed_info;
929
}
930
Err(err) => {
931
trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
932
self.remove_processed_asset_and_meta(source, asset_path.path())
933
.await;
934
}
935
}
936
}
937
Err(err) => {
938
trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
939
self.remove_processed_asset_and_meta(source, asset_path.path())
940
.await;
941
}
942
}
943
} else {
944
trace!("Removing processed data for non-existent asset {asset_path}");
945
self.remove_processed_asset_and_meta(source, asset_path.path())
946
.await;
947
}
948
949
for dependency in dependencies {
950
asset_infos.add_dependent(&dependency, asset_path.clone());
951
}
952
}
953
}
954
955
self.data
956
.processing_state
957
.set_state(ProcessorState::Processing)
958
.await;
959
960
Ok(())
961
}
962
963
/// Removes the processed version of an asset and its metadata, if it exists. This _is not_ transactional like `remove_processed_asset_transactional`, nor
964
/// does it remove existing in-memory metadata.
965
async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
966
if let Err(err) = source.processed_writer().unwrap().remove(path).await {
967
warn!("Failed to remove non-existent asset {path:?}: {err}");
968
}
969
970
if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
971
warn!("Failed to remove non-existent meta {path:?}: {err}");
972
}
973
974
self.clean_empty_processed_ancestor_folders(source, path)
975
.await;
976
}
977
978
async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
979
// As a safety precaution don't delete absolute paths to avoid deleting folders outside of the destination folder
980
if path.is_absolute() {
981
error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
982
return;
983
}
984
while let Some(parent) = path.parent() {
985
if parent == Path::new("") {
986
break;
987
}
988
if source
989
.processed_writer()
990
.unwrap()
991
.remove_empty_directory(parent)
992
.await
993
.is_err()
994
{
995
// if we fail to delete a folder, stop walking up the tree
996
break;
997
}
998
}
999
}
1000
1001
/// Processes the asset (if it has not already been processed or the asset source has changed).
1002
/// If the asset has "process dependencies" (relies on the values of other assets), it will asynchronously await until
1003
/// the dependencies have been processed (See [`ProcessorGatedReader`], which is used in the [`AssetProcessor`]'s [`AssetServer`]
1004
/// to block reads until the asset is processed).
1005
///
1006
/// [`LoadContext`]: crate::loader::LoadContext
1007
/// [`ProcessorGatedReader`]: crate::io::processor_gated::ProcessorGatedReader
1008
async fn process_asset(
1009
&self,
1010
source: &AssetSource,
1011
path: PathBuf,
1012
processor_task_event: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1013
) {
1014
let asset_path = AssetPath::from(path).with_source(source.id());
1015
let result = self.process_asset_internal(source, &asset_path).await;
1016
let mut infos = self.data.processing_state.asset_infos.write().await;
1017
infos
1018
.finish_processing(asset_path, result, processor_task_event)
1019
.await;
1020
}
1021
1022
async fn process_asset_internal(
1023
&self,
1024
source: &AssetSource,
1025
asset_path: &AssetPath<'static>,
1026
) -> Result<ProcessResult, ProcessError> {
1027
// TODO: check if already processing to protect against duplicate hot-reload events
1028
debug!("Processing {}", asset_path);
1029
let server = &self.server;
1030
let path = asset_path.path();
1031
let reader = source.reader();
1032
1033
let reader_err = |err| ProcessError::AssetReaderError {
1034
path: asset_path.clone(),
1035
err,
1036
};
1037
let writer_err = |err| ProcessError::AssetWriterError {
1038
path: asset_path.clone(),
1039
err,
1040
};
1041
1042
let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
1043
Ok(meta_bytes) => {
1044
let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
1045
ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
1046
})?;
1047
let (meta, processor) = match minimal.asset {
1048
AssetActionMinimal::Load { loader } => {
1049
let loader = server.get_asset_loader_with_type_name(&loader).await?;
1050
let meta = loader.deserialize_meta(&meta_bytes)?;
1051
(meta, None)
1052
}
1053
AssetActionMinimal::Process { processor } => {
1054
let processor = self.get_processor(&processor)?;
1055
let meta = processor.deserialize_meta(&meta_bytes)?;
1056
(meta, Some(processor))
1057
}
1058
AssetActionMinimal::Ignore => {
1059
return Ok(ProcessResult::Ignored);
1060
}
1061
};
1062
(meta, meta_bytes, processor)
1063
}
1064
Err(AssetReaderError::NotFound(_path)) => {
1065
let (meta, processor) = if let Some(processor) = asset_path
1066
.get_full_extension()
1067
.and_then(|ext| self.get_default_processor(&ext))
1068
{
1069
let meta = processor.default_meta();
1070
(meta, Some(processor))
1071
} else {
1072
match server.get_path_asset_loader(asset_path.clone()).await {
1073
Ok(loader) => (loader.default_meta(), None),
1074
Err(MissingAssetLoaderForExtensionError { .. }) => {
1075
let meta: Box<dyn AssetMetaDyn> =
1076
Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
1077
(meta, None)
1078
}
1079
}
1080
};
1081
let meta_bytes = meta.serialize();
1082
(meta, meta_bytes, processor)
1083
}
1084
Err(err) => {
1085
return Err(ProcessError::ReadAssetMetaError {
1086
path: asset_path.clone(),
1087
err,
1088
})
1089
}
1090
};
1091
1092
let processed_writer = source.processed_writer()?;
1093
1094
let new_hash = {
1095
// Create a reader just for computing the hash. Keep this scoped here so that we drop it
1096
// as soon as the hash is computed.
1097
let mut reader_for_hash = reader.read(path).await.map_err(reader_err)?;
1098
1099
get_asset_hash(&meta_bytes, &mut reader_for_hash)
1100
.await
1101
.map_err(reader_err)?
1102
};
1103
let mut new_processed_info = ProcessedInfo {
1104
hash: new_hash,
1105
full_hash: new_hash,
1106
process_dependencies: Vec::new(),
1107
};
1108
1109
{
1110
let infos = self.data.processing_state.asset_infos.read().await;
1111
if let Some(current_processed_info) = infos
1112
.get(asset_path)
1113
.and_then(|i| i.processed_info.as_ref())
1114
&& current_processed_info.hash == new_hash
1115
{
1116
let mut dependency_changed = false;
1117
for current_dep_info in &current_processed_info.process_dependencies {
1118
let live_hash = infos
1119
.get(&current_dep_info.path)
1120
.and_then(|i| i.processed_info.as_ref())
1121
.map(|i| i.full_hash);
1122
if live_hash != Some(current_dep_info.full_hash) {
1123
dependency_changed = true;
1124
break;
1125
}
1126
}
1127
if !dependency_changed {
1128
return Ok(ProcessResult::SkippedNotChanged);
1129
}
1130
}
1131
}
1132
1133
// Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
1134
// See ProcessedAssetInfo::file_transaction_lock docs for more info
1135
let _transaction_lock = {
1136
let lock = {
1137
let mut infos = self.data.processing_state.asset_infos.write().await;
1138
let info = infos.get_or_insert(asset_path.clone());
1139
// Clone out the transaction lock first and then lock after we've dropped the
1140
// asset_infos. Otherwise, trying to lock a single path can block all other paths to
1141
// (leading to deadlocks).
1142
info.file_transaction_lock.clone()
1143
};
1144
lock.write_arc().await
1145
};
1146
1147
// NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run.
1148
// Directly writing to the asset destination in the processor necessitates this behavior
1149
// TODO: this class of failure can be recovered via re-processing + smarter log validation that allows for duplicate transactions in the event of failures
1150
self.log_begin_processing(asset_path).await;
1151
if let Some(processor) = processor {
1152
// Unwrap is ok since we have a processor, so the `AssetAction` must have been
1153
// `AssetAction::Process` (which includes its settings).
1154
let settings = source_meta.process_settings().unwrap();
1155
1156
// Create a reader just for the actual process. Note: this means that we're performing
1157
// two reads for the same file (but we avoid having to load the whole file into memory).
1158
// For some sources (like local file systems), this is not a big deal, but for other
1159
// sources like an HTTP asset sources, this could be an entire additional download (if
1160
// the asset source doesn't do any caching). In practice, most sources being processed
1161
// are likely to be local, and processing in general is a publish-time operation, so
1162
// it's not likely to be too big a deal. If in the future, we decide we want to avoid
1163
// this repeated read, we could "ask" the asset source if it prefers avoiding repeated
1164
// reads or not.
1165
let reader_for_process = reader.read(path).await.map_err(reader_err)?;
1166
1167
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1168
let mut processed_meta = {
1169
let mut context = ProcessContext::new(
1170
self,
1171
asset_path,
1172
reader_for_process,
1173
&mut new_processed_info,
1174
);
1175
let process = processor.process(&mut context, settings, &mut *writer);
1176
#[cfg(feature = "trace")]
1177
let process = {
1178
let span = info_span!(
1179
"asset processing",
1180
processor = processor.type_path(),
1181
asset = asset_path.to_string(),
1182
);
1183
process.instrument(span)
1184
};
1185
process.await?
1186
};
1187
1188
writer
1189
.flush()
1190
.await
1191
.map_err(|e| ProcessError::AssetWriterError {
1192
path: asset_path.clone(),
1193
err: AssetWriterError::Io(e),
1194
})?;
1195
1196
let full_hash = get_full_asset_hash(
1197
new_hash,
1198
new_processed_info
1199
.process_dependencies
1200
.iter()
1201
.map(|i| i.full_hash),
1202
);
1203
new_processed_info.full_hash = full_hash;
1204
*processed_meta.processed_info_mut() = Some(new_processed_info.clone());
1205
let meta_bytes = processed_meta.serialize();
1206
1207
processed_writer
1208
.write_meta_bytes(path, &meta_bytes)
1209
.await
1210
.map_err(writer_err)?;
1211
} else {
1212
// See the reasoning for processing why it's ok to do a second read here.
1213
let mut reader_for_copy = reader.read(path).await.map_err(reader_err)?;
1214
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1215
futures_lite::io::copy(&mut reader_for_copy, &mut writer)
1216
.await
1217
.map_err(|err| ProcessError::AssetWriterError {
1218
path: asset_path.clone_owned(),
1219
err: err.into(),
1220
})?;
1221
*source_meta.processed_info_mut() = Some(new_processed_info.clone());
1222
let meta_bytes = source_meta.serialize();
1223
processed_writer
1224
.write_meta_bytes(path, &meta_bytes)
1225
.await
1226
.map_err(writer_err)?;
1227
}
1228
self.log_end_processing(asset_path).await;
1229
1230
Ok(ProcessResult::Processed(new_processed_info))
1231
}
1232
1233
async fn validate_transaction_log_and_recover(&self) {
1234
let log_factory = self
1235
.data
1236
.log_factory
1237
.lock()
1238
.unwrap_or_else(PoisonError::into_inner)
1239
// Take the log factory to indicate we've started and this should disable setting a new
1240
// log factory.
1241
.take()
1242
.expect("the asset processor only starts once");
1243
if let Err(err) = validate_transaction_log(log_factory.as_ref()).await {
1244
let state_is_valid = match err {
1245
ValidateLogError::ReadLogError(err) => {
1246
error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
1247
false
1248
}
1249
ValidateLogError::UnrecoverableError => {
1250
error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
1251
false
1252
}
1253
ValidateLogError::EntryErrors(entry_errors) => {
1254
let mut state_is_valid = true;
1255
for entry_error in entry_errors {
1256
match entry_error {
1257
LogEntryError::DuplicateTransaction(_)
1258
| LogEntryError::EndedMissingTransaction(_) => {
1259
error!("{}", entry_error);
1260
state_is_valid = false;
1261
break;
1262
}
1263
LogEntryError::UnfinishedTransaction(path) => {
1264
debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
1265
let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
1266
error!("Failed to remove asset {path:?}: {message}");
1267
state_is_valid = false;
1268
};
1269
let Ok(source) = self.get_source(path.source()) else {
1270
unrecoverable_err(&"AssetSource does not exist");
1271
continue;
1272
};
1273
let Ok(processed_writer) = source.processed_writer() else {
1274
unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
1275
continue;
1276
};
1277
1278
if let Err(err) = processed_writer.remove(path.path()).await {
1279
match err {
1280
AssetWriterError::Io(err) => {
1281
// any error but NotFound means we could be in a bad state
1282
if err.kind() != ErrorKind::NotFound {
1283
unrecoverable_err(&err);
1284
}
1285
}
1286
}
1287
}
1288
if let Err(err) = processed_writer.remove_meta(path.path()).await {
1289
match err {
1290
AssetWriterError::Io(err) => {
1291
// any error but NotFound means we could be in a bad state
1292
if err.kind() != ErrorKind::NotFound {
1293
unrecoverable_err(&err);
1294
}
1295
}
1296
}
1297
}
1298
}
1299
}
1300
}
1301
state_is_valid
1302
}
1303
};
1304
1305
if !state_is_valid {
1306
error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
1307
for source in self.sources().iter_processed() {
1308
let Ok(processed_writer) = source.processed_writer() else {
1309
continue;
1310
};
1311
if let Err(err) = processed_writer
1312
.remove_assets_in_directory(Path::new(""))
1313
.await
1314
{
1315
panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
1316
}
1317
}
1318
}
1319
}
1320
let mut log = self.data.log.write().await;
1321
*log = match log_factory.create_new_log().await {
1322
Ok(log) => Some(log),
1323
Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1324
};
1325
}
1326
}
1327
1328
impl AssetProcessorData {
1329
/// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`].
1330
pub(crate) fn new(sources: Arc<AssetSources>, processing_state: Arc<ProcessingState>) -> Self {
1331
AssetProcessorData {
1332
processing_state,
1333
sources,
1334
log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))),
1335
log: Default::default(),
1336
processors: Default::default(),
1337
}
1338
}
1339
1340
/// Sets the transaction log factory for the processor.
1341
///
1342
/// If this is called after asset processing has begun (in the `Startup` schedule), it will
1343
/// return an error. If not called, the default transaction log will be used.
1344
pub fn set_log_factory(
1345
&self,
1346
factory: Box<dyn ProcessorTransactionLogFactory>,
1347
) -> Result<(), SetTransactionLogFactoryError> {
1348
let mut log_factory = self
1349
.log_factory
1350
.lock()
1351
.unwrap_or_else(PoisonError::into_inner);
1352
if log_factory.is_none() {
1353
// This indicates the asset processor has already started, so setting the factory does
1354
// nothing here.
1355
return Err(SetTransactionLogFactoryError::AlreadyInUse);
1356
}
1357
1358
*log_factory = Some(factory);
1359
Ok(())
1360
}
1361
1362
/// Returns a future that will not finish until the path has been processed.
1363
pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1364
self.processing_state.wait_until_processed(path).await
1365
}
1366
1367
/// Returns a future that will not finish until the processor has been initialized.
1368
pub async fn wait_until_initialized(&self) {
1369
self.processing_state.wait_until_initialized().await;
1370
}
1371
1372
/// Returns a future that will not finish until processing has finished.
1373
pub async fn wait_until_finished(&self) {
1374
self.processing_state.wait_until_finished().await;
1375
}
1376
}
1377
1378
impl ProcessingState {
1379
/// Creates a new empty processing state.
1380
fn new() -> Self {
1381
let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1382
let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1383
// allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1384
// not block if there was older state present.
1385
initialized_sender.set_overflow(true);
1386
finished_sender.set_overflow(true);
1387
1388
Self {
1389
state: async_lock::RwLock::new(ProcessorState::Initializing),
1390
initialized_sender,
1391
initialized_receiver,
1392
finished_sender,
1393
finished_receiver,
1394
asset_infos: Default::default(),
1395
}
1396
}
1397
1398
/// Sets the overall state of processing and broadcasts appropriate events.
1399
async fn set_state(&self, state: ProcessorState) {
1400
let mut state_guard = self.state.write().await;
1401
let last_state = *state_guard;
1402
*state_guard = state;
1403
if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
1404
self.finished_sender.broadcast(()).await.unwrap();
1405
} else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
1406
self.initialized_sender.broadcast(()).await.unwrap();
1407
}
1408
}
1409
1410
/// Retrieves the current [`ProcessorState`]
1411
pub(crate) async fn get_state(&self) -> ProcessorState {
1412
*self.state.read().await
1413
}
1414
1415
/// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
1416
/// while it is held.
1417
pub(crate) async fn get_transaction_lock(
1418
&self,
1419
path: &AssetPath<'static>,
1420
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
1421
let lock = {
1422
let infos = self.asset_infos.read().await;
1423
let info = infos
1424
.get(path)
1425
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
1426
// Clone out the transaction lock first and then lock after we've dropped the
1427
// asset_infos. Otherwise, trying to lock a single path can block all other paths to
1428
// (leading to deadlocks).
1429
info.file_transaction_lock.clone()
1430
};
1431
Ok(lock.read_arc().await)
1432
}
1433
1434
/// Returns a future that will not finish until the path has been processed.
1435
pub(crate) async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1436
self.wait_until_initialized().await;
1437
let mut receiver = {
1438
let infos = self.asset_infos.write().await;
1439
let info = infos.get(&path);
1440
match info {
1441
Some(info) => match info.status {
1442
Some(result) => return result,
1443
// This receiver must be created prior to losing the read lock to ensure this is transactional
1444
None => info.status_receiver.clone(),
1445
},
1446
None => return ProcessStatus::NonExistent,
1447
}
1448
};
1449
receiver.recv().await.unwrap()
1450
}
1451
1452
/// Returns a future that will not finish until the processor has been initialized.
1453
pub(crate) async fn wait_until_initialized(&self) {
1454
let receiver = {
1455
let state = self.state.read().await;
1456
match *state {
1457
ProcessorState::Initializing => {
1458
// This receiver must be created prior to losing the read lock to ensure this is transactional
1459
Some(self.initialized_receiver.clone())
1460
}
1461
_ => None,
1462
}
1463
};
1464
1465
if let Some(mut receiver) = receiver {
1466
receiver.recv().await.unwrap();
1467
}
1468
}
1469
1470
/// Returns a future that will not finish until processing has finished.
1471
pub(crate) async fn wait_until_finished(&self) {
1472
let receiver = {
1473
let state = self.state.read().await;
1474
match *state {
1475
ProcessorState::Initializing | ProcessorState::Processing => {
1476
// This receiver must be created prior to losing the read lock to ensure this is transactional
1477
Some(self.finished_receiver.clone())
1478
}
1479
ProcessorState::Finished => None,
1480
}
1481
};
1482
1483
if let Some(mut receiver) = receiver {
1484
receiver.recv().await.unwrap();
1485
}
1486
}
1487
}
1488
1489
/// The (successful) result of processing an asset
1490
#[derive(Debug, Clone)]
1491
pub enum ProcessResult {
1492
Processed(ProcessedInfo),
1493
SkippedNotChanged,
1494
Ignored,
1495
}
1496
1497
/// The final status of processing an asset
1498
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1499
pub enum ProcessStatus {
1500
Processed,
1501
Failed,
1502
NonExistent,
1503
}
1504
1505
// NOTE: if you add new fields to this struct, make sure they are propagated (when relevant) in ProcessorAssetInfos::rename
1506
#[derive(Debug)]
1507
pub(crate) struct ProcessorAssetInfo {
1508
processed_info: Option<ProcessedInfo>,
1509
/// Paths of assets that depend on this asset when they are being processed.
1510
dependents: HashSet<AssetPath<'static>>,
1511
status: Option<ProcessStatus>,
1512
/// A lock that controls read/write access to processed asset files. The lock is shared for both the asset bytes and the meta bytes.
1513
/// _This lock must be locked whenever a read or write to processed assets occurs_
1514
/// There are scenarios where processed assets (and their metadata) are being read and written in multiple places at once:
1515
/// * when the processor is running in parallel with an app
1516
/// * when processing assets in parallel, the processor might read an asset's `process_dependencies` when processing new versions of those dependencies
1517
/// * this second scenario almost certainly isn't possible with the current implementation, but its worth protecting against
1518
///
1519
/// This lock defends against those scenarios by ensuring readers don't read while processed files are being written. And it ensures
1520
/// Because this lock is shared across meta and asset bytes, readers can ensure they don't read "old" versions of metadata with "new" asset data.
1521
pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1522
status_sender: async_broadcast::Sender<ProcessStatus>,
1523
status_receiver: async_broadcast::Receiver<ProcessStatus>,
1524
}
1525
1526
impl Default for ProcessorAssetInfo {
1527
fn default() -> Self {
1528
let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1529
// allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1530
// not block if there was older state present.
1531
status_sender.set_overflow(true);
1532
Self {
1533
processed_info: Default::default(),
1534
dependents: Default::default(),
1535
file_transaction_lock: Default::default(),
1536
status: None,
1537
status_sender,
1538
status_receiver,
1539
}
1540
}
1541
}
1542
1543
impl ProcessorAssetInfo {
1544
async fn update_status(&mut self, status: ProcessStatus) {
1545
if self.status != Some(status) {
1546
self.status = Some(status);
1547
self.status_sender.broadcast(status).await.unwrap();
1548
}
1549
}
1550
}
1551
1552
/// The "current" in memory view of the asset space. This is "eventually consistent". It does not directly
1553
/// represent the state of assets in storage, but rather a valid historical view that will gradually become more
1554
/// consistent as events are processed.
1555
#[derive(Default, Debug)]
1556
pub struct ProcessorAssetInfos {
1557
/// The "current" in memory view of the asset space. During processing, if path does not exist in this, it should
1558
/// be considered non-existent.
1559
/// NOTE: YOU MUST USE `Self::get_or_insert` or `Self::insert` TO ADD ITEMS TO THIS COLLECTION TO ENSURE
1560
/// `non_existent_dependents` DATA IS CONSUMED
1561
infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1562
/// Dependents for assets that don't exist. This exists to track "dangling" asset references due to deleted / missing files.
1563
/// If the dependent asset is added, it can "resolve" these dependencies and re-compute those assets.
1564
/// Therefore this _must_ always be consistent with the `infos` data. If a new asset is added to `infos`, it should
1565
/// check this maps for dependencies and add them. If an asset is removed, it should update the dependents here.
1566
non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1567
}
1568
1569
impl ProcessorAssetInfos {
1570
fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1571
self.infos.entry(asset_path.clone()).or_insert_with(|| {
1572
let mut info = ProcessorAssetInfo::default();
1573
// track existing dependents by resolving existing "hanging" dependents.
1574
if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1575
info.dependents = dependents;
1576
}
1577
info
1578
})
1579
}
1580
1581
pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1582
self.infos.get(asset_path)
1583
}
1584
1585
fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1586
self.infos.get_mut(asset_path)
1587
}
1588
1589
fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1590
if let Some(info) = self.get_mut(asset_path) {
1591
info.dependents.insert(dependent);
1592
} else {
1593
let dependents = self
1594
.non_existent_dependents
1595
.entry(asset_path.clone())
1596
.or_default();
1597
dependents.insert(dependent);
1598
}
1599
}
1600
1601
/// Finalize processing the asset, which will incorporate the result of the processed asset into the in-memory view the processed assets.
1602
async fn finish_processing(
1603
&mut self,
1604
asset_path: AssetPath<'static>,
1605
result: Result<ProcessResult, ProcessError>,
1606
reprocess_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1607
) {
1608
match result {
1609
Ok(ProcessResult::Processed(processed_info)) => {
1610
debug!("Finished processing \"{}\"", asset_path);
1611
// clean up old dependents
1612
let old_processed_info = self
1613
.infos
1614
.get_mut(&asset_path)
1615
.and_then(|i| i.processed_info.take());
1616
if let Some(old_processed_info) = old_processed_info {
1617
self.clear_dependencies(&asset_path, old_processed_info);
1618
}
1619
1620
// populate new dependents
1621
for process_dependency_info in &processed_info.process_dependencies {
1622
self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1623
}
1624
let info = self.get_or_insert(asset_path);
1625
info.processed_info = Some(processed_info);
1626
info.update_status(ProcessStatus::Processed).await;
1627
let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1628
for path in dependents {
1629
let _ = reprocess_sender
1630
.send((path.source().clone_owned(), path.path().to_owned()))
1631
.await;
1632
}
1633
}
1634
Ok(ProcessResult::SkippedNotChanged) => {
1635
debug!("Skipping processing (unchanged) \"{}\"", asset_path);
1636
let info = self.get_mut(&asset_path).expect("info should exist");
1637
// NOTE: skipping an asset on a given pass doesn't mean it won't change in the future as a result
1638
// of a dependency being re-processed. This means apps might receive an "old" (but valid) asset first.
1639
// This is in the interest of fast startup times that don't block for all assets being checked + reprocessed
1640
// Therefore this relies on hot-reloading in the app to pickup the "latest" version of the asset
1641
// If "block until latest state is reflected" is required, we can easily add a less granular
1642
// "block until first pass finished" mode
1643
info.update_status(ProcessStatus::Processed).await;
1644
}
1645
Ok(ProcessResult::Ignored) => {
1646
debug!("Skipping processing (ignored) \"{}\"", asset_path);
1647
}
1648
Err(ProcessError::ExtensionRequired) => {
1649
// Skip assets without extensions
1650
}
1651
Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1652
trace!("No loader found for {asset_path}");
1653
}
1654
Err(ProcessError::AssetReaderError {
1655
err: AssetReaderError::NotFound(_),
1656
..
1657
}) => {
1658
// if there is no asset source, no processing can be done
1659
trace!("No need to process asset {asset_path} because it does not exist");
1660
}
1661
Err(err) => {
1662
error!("Failed to process asset {asset_path}: {err}");
1663
// if this failed because a dependency could not be loaded, make sure it is reprocessed if that dependency is reprocessed
1664
if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1665
err
1666
{
1667
let info = self.get_mut(&asset_path).expect("info should exist");
1668
info.processed_info = Some(ProcessedInfo {
1669
hash: AssetHash::default(),
1670
full_hash: AssetHash::default(),
1671
process_dependencies: vec![],
1672
});
1673
self.add_dependent(dependency.path(), asset_path.to_owned());
1674
}
1675
1676
let info = self.get_mut(&asset_path).expect("info should exist");
1677
info.update_status(ProcessStatus::Failed).await;
1678
}
1679
}
1680
}
1681
1682
/// Remove the info for the given path. This should only happen if an asset's source is
1683
/// removed/non-existent. Returns the transaction lock for the asset, or [`None`] if the asset
1684
/// path does not exist.
1685
async fn remove(
1686
&mut self,
1687
asset_path: &AssetPath<'static>,
1688
) -> Option<Arc<async_lock::RwLock<()>>> {
1689
let info = self.infos.remove(asset_path)?;
1690
if let Some(processed_info) = info.processed_info {
1691
self.clear_dependencies(asset_path, processed_info);
1692
}
1693
// Tell all listeners this asset does not exist
1694
info.status_sender
1695
.broadcast(ProcessStatus::NonExistent)
1696
.await
1697
.unwrap();
1698
if !info.dependents.is_empty() {
1699
error!(
1700
"The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1701
info.dependents
1702
);
1703
self.non_existent_dependents
1704
.insert(asset_path.clone(), info.dependents);
1705
}
1706
1707
Some(info.file_transaction_lock)
1708
}
1709
1710
/// Remove the info for the old path, and move over its info to the new path. This should only
1711
/// happen if an asset's source is removed/non-existent. Returns the transaction locks for the
1712
/// old and new assets respectively, or [`None`] if the old path does not exist.
1713
async fn rename(
1714
&mut self,
1715
old: &AssetPath<'static>,
1716
new: &AssetPath<'static>,
1717
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1718
) -> Option<(Arc<async_lock::RwLock<()>>, Arc<async_lock::RwLock<()>>)> {
1719
let mut info = self.infos.remove(old)?;
1720
if !info.dependents.is_empty() {
1721
// TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
1722
// doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
1723
// we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
1724
// If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
1725
// If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
1726
// TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
1727
// (see the remove impl).
1728
error!(
1729
"The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1730
info.dependents
1731
);
1732
self.non_existent_dependents
1733
.insert(old.clone(), core::mem::take(&mut info.dependents));
1734
}
1735
if let Some(processed_info) = &info.processed_info {
1736
// Update "dependent" lists for this asset's "process dependencies" to use new path.
1737
for dep in &processed_info.process_dependencies {
1738
if let Some(info) = self.infos.get_mut(&dep.path) {
1739
info.dependents.remove(old);
1740
info.dependents.insert(new.clone());
1741
} else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) {
1742
dependents.remove(old);
1743
dependents.insert(new.clone());
1744
}
1745
}
1746
}
1747
// Tell all listeners this asset no longer exists
1748
info.status_sender
1749
.broadcast(ProcessStatus::NonExistent)
1750
.await
1751
.unwrap();
1752
let new_info = self.get_or_insert(new.clone());
1753
new_info.processed_info = info.processed_info;
1754
new_info.status = info.status;
1755
// Ensure things waiting on the new path are informed of the status of this asset
1756
if let Some(status) = new_info.status {
1757
new_info.status_sender.broadcast(status).await.unwrap();
1758
}
1759
let dependents = new_info.dependents.iter().cloned().collect::<Vec<_>>();
1760
// Queue the asset for a reprocess check, in case it needs new meta.
1761
let _ = new_task_sender
1762
.send((new.source().clone_owned(), new.path().to_owned()))
1763
.await;
1764
for dependent in dependents {
1765
// Queue dependents for reprocessing because they might have been waiting for this asset.
1766
let _ = new_task_sender
1767
.send((
1768
dependent.source().clone_owned(),
1769
dependent.path().to_owned(),
1770
))
1771
.await;
1772
}
1773
1774
Some((
1775
info.file_transaction_lock,
1776
new_info.file_transaction_lock.clone(),
1777
))
1778
}
1779
1780
fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1781
for old_load_dep in removed_info.process_dependencies {
1782
if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1783
info.dependents.remove(asset_path);
1784
} else if let Some(dependents) =
1785
self.non_existent_dependents.get_mut(&old_load_dep.path)
1786
{
1787
dependents.remove(asset_path);
1788
}
1789
}
1790
}
1791
}
1792
1793
/// The current state of the [`AssetProcessor`].
1794
#[derive(Copy, Clone, PartialEq, Eq)]
1795
pub enum ProcessorState {
1796
/// The processor is still initializing, which involves scanning the current asset folders,
1797
/// constructing an in-memory view of the asset space, recovering from previous errors / crashes,
1798
/// and cleaning up old / unused assets.
1799
Initializing,
1800
/// The processor is currently processing assets.
1801
Processing,
1802
/// The processor has finished processing all valid assets and reporting invalid assets.
1803
Finished,
1804
}
1805
1806
/// An error that occurs when initializing the [`AssetProcessor`].
1807
#[derive(Error, Debug)]
1808
pub enum InitializeError {
1809
#[error(transparent)]
1810
FailedToReadSourcePaths(AssetReaderError),
1811
#[error(transparent)]
1812
FailedToReadDestinationPaths(AssetReaderError),
1813
#[error("Failed to validate asset log: {0}")]
1814
ValidateLogError(#[from] ValidateLogError),
1815
}
1816
1817
/// An error when attempting to set the transaction log factory.
1818
#[derive(Error, Debug)]
1819
pub enum SetTransactionLogFactoryError {
1820
#[error("Transaction log is already in use so setting the factory does nothing")]
1821
AlreadyInUse,
1822
}
1823
1824
/// An error when retrieving an asset processor.
1825
#[derive(Error, Debug, PartialEq, Eq)]
1826
pub enum GetProcessorError {
1827
#[error("The processor '{0}' does not exist")]
1828
Missing(String),
1829
#[error("The processor '{processor_short_name}' is ambiguous between several processors: {ambiguous_processor_names:?}")]
1830
Ambiguous {
1831
processor_short_name: String,
1832
ambiguous_processor_names: Vec<&'static str>,
1833
},
1834
}
1835
1836
impl From<GetProcessorError> for ProcessError {
1837
fn from(err: GetProcessorError) -> Self {
1838
match err {
1839
GetProcessorError::Missing(name) => Self::MissingProcessor(name),
1840
GetProcessorError::Ambiguous {
1841
processor_short_name,
1842
ambiguous_processor_names,
1843
} => Self::AmbiguousProcessor {
1844
processor_short_name,
1845
ambiguous_processor_names,
1846
},
1847
}
1848
}
1849
}
1850
1851
#[cfg(test)]
1852
mod tests;
1853
1854