mod log;
mod process;
pub use log::*;
pub use process::*;
use crate::{
io::{
AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
AssetSources, AssetWriterError, ErasedAssetReader, ErasedAssetWriter,
MissingAssetSourceError,
},
meta::{
get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
},
AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
};
use alloc::{borrow::ToOwned, boxed::Box, collections::VecDeque, sync::Arc, vec, vec::Vec};
use bevy_ecs::prelude::*;
use bevy_platform::collections::{HashMap, HashSet};
use bevy_tasks::IoTaskPool;
use futures_io::ErrorKind;
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use parking_lot::RwLock;
use std::path::{Path, PathBuf};
use thiserror::Error;
use tracing::{debug, error, trace, warn};
#[cfg(feature = "trace")]
use {
alloc::string::ToString,
bevy_tasks::ConditionalSendFuture,
tracing::{info_span, instrument::Instrument},
};
#[derive(Resource, Clone)]
pub struct AssetProcessor {
server: AssetServer,
pub(crate) data: Arc<AssetProcessorData>,
}
pub struct AssetProcessorData {
pub(crate) asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
log: async_lock::RwLock<Option<ProcessorTransactionLog>>,
processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
default_processors: RwLock<HashMap<Box<str>, &'static str>>,
state: async_lock::RwLock<ProcessorState>,
sources: AssetSources,
initialized_sender: async_broadcast::Sender<()>,
initialized_receiver: async_broadcast::Receiver<()>,
finished_sender: async_broadcast::Sender<()>,
finished_receiver: async_broadcast::Receiver<()>,
}
impl AssetProcessor {
pub fn new(source: &mut AssetSourceBuilders) -> Self {
let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false)));
let mut sources = source.build_sources(false, false);
sources.gate_on_processor(data.clone());
let server = AssetServer::new_with_meta_check(
sources,
AssetServerMode::Processed,
AssetMetaCheck::Always,
false,
UnapprovedPathMode::default(),
);
Self { server, data }
}
pub fn data(&self) -> &Arc<AssetProcessorData> {
&self.data
}
pub fn server(&self) -> &AssetServer {
&self.server
}
async fn set_state(&self, state: ProcessorState) {
let mut state_guard = self.data.state.write().await;
let last_state = *state_guard;
*state_guard = state;
if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
self.data.finished_sender.broadcast(()).await.unwrap();
} else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
self.data.initialized_sender.broadcast(()).await.unwrap();
}
}
pub async fn get_state(&self) -> ProcessorState {
*self.data.state.read().await
}
#[inline]
pub fn get_source<'a>(
&self,
id: impl Into<AssetSourceId<'a>>,
) -> Result<&AssetSource, MissingAssetSourceError> {
self.data.sources.get(id.into())
}
#[inline]
pub fn sources(&self) -> &AssetSources {
&self.data.sources
}
async fn log_unrecoverable(&self) {
let mut log = self.data.log.write().await;
let log = log.as_mut().unwrap();
log.unrecoverable().await.unwrap();
}
async fn log_begin_processing(&self, path: &AssetPath<'_>) {
let mut log = self.data.log.write().await;
let log = log.as_mut().unwrap();
log.begin_processing(path).await.unwrap();
}
async fn log_end_processing(&self, path: &AssetPath<'_>) {
let mut log = self.data.log.write().await;
let log = log.as_mut().unwrap();
log.end_processing(path).await.unwrap();
}
pub fn start(_processor: Res<Self>) {
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
error!("Cannot run AssetProcessor in single threaded mode (or Wasm) yet.");
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
{
let processor = _processor.clone();
std::thread::spawn(move || {
processor.process_assets();
bevy_tasks::block_on(processor.listen_for_source_change_events());
});
}
}
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
pub fn process_assets(&self) {
let start_time = std::time::Instant::now();
debug!("Processing Assets");
IoTaskPool::get().scope(|scope| {
scope.spawn(async move {
self.initialize().await.unwrap();
for source in self.sources().iter_processed() {
self.process_assets_internal(scope, source, PathBuf::from(""))
.await
.unwrap();
}
});
});
bevy_tasks::block_on(self.finish_processing_assets());
let end_time = std::time::Instant::now();
debug!("Processing finished in {:?}", end_time - start_time);
}
pub async fn listen_for_source_change_events(&self) {
debug!("Listening for changes to source assets");
loop {
let mut started_processing = false;
for source in self.data.sources.iter_processed() {
if let Some(receiver) = source.event_receiver() {
for event in receiver.try_iter() {
if !started_processing {
self.set_state(ProcessorState::Processing).await;
started_processing = true;
}
self.handle_asset_source_event(source, event).await;
}
}
}
if started_processing {
self.finish_processing_assets().await;
}
}
}
pub async fn write_default_meta_file_for_path(
&self,
path: impl Into<AssetPath<'_>>,
) -> Result<(), WriteDefaultMetaError> {
let path = path.into();
let Some(processor) = path
.get_full_extension()
.and_then(|extension| self.get_default_processor(&extension))
else {
return self
.server
.write_default_loader_meta_file_for_path(path)
.await;
};
let meta = processor.default_meta();
let serialized_meta = meta.serialize();
let source = self.get_source(path.source())?;
let reader = source.reader();
match reader.read_meta_bytes(path.path()).await {
Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
Err(AssetReaderError::NotFound(_)) => {
}
Err(AssetReaderError::Io(err)) => {
return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
}
Err(AssetReaderError::HttpError(err)) => {
return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
}
}
let writer = source.writer()?;
writer
.write_meta_bytes(path.path(), &serialized_meta)
.await?;
Ok(())
}
async fn handle_asset_source_event(&self, source: &AssetSource, event: AssetSourceEvent) {
trace!("{event:?}");
match event {
AssetSourceEvent::AddedAsset(path)
| AssetSourceEvent::AddedMeta(path)
| AssetSourceEvent::ModifiedAsset(path)
| AssetSourceEvent::ModifiedMeta(path) => {
self.process_asset(source, path).await;
}
AssetSourceEvent::RemovedAsset(path) => {
self.handle_removed_asset(source, path).await;
}
AssetSourceEvent::RemovedMeta(path) => {
self.handle_removed_meta(source, path).await;
}
AssetSourceEvent::AddedFolder(path) => {
self.handle_added_folder(source, path).await;
}
AssetSourceEvent::RemovedFolder(path) => {
self.handle_removed_folder(source, &path).await;
}
AssetSourceEvent::RenamedAsset { old, new } => {
if old == new {
self.process_asset(source, new).await;
} else {
self.handle_renamed_asset(source, old, new).await;
}
}
AssetSourceEvent::RenamedMeta { old, new } => {
if old == new {
self.process_asset(source, new).await;
} else {
debug!("Meta renamed from {old:?} to {new:?}");
let mut infos = self.data.asset_infos.write().await;
let new_asset_path = AssetPath::from(new).with_source(source.id());
let old_asset_path = AssetPath::from(old).with_source(source.id());
infos.check_reprocess_queue.push_back(old_asset_path);
infos.check_reprocess_queue.push_back(new_asset_path);
}
}
AssetSourceEvent::RenamedFolder { old, new } => {
if old == new {
self.handle_added_folder(source, new).await;
} else {
self.handle_removed_folder(source, &old).await;
self.handle_added_folder(source, new).await;
}
}
AssetSourceEvent::RemovedUnknown { path, is_meta } => {
let processed_reader = source.processed_reader().unwrap();
match processed_reader.is_directory(&path).await {
Ok(is_directory) => {
if is_directory {
self.handle_removed_folder(source, &path).await;
} else if is_meta {
self.handle_removed_meta(source, path).await;
} else {
self.handle_removed_asset(source, path).await;
}
}
Err(err) => {
match err {
AssetReaderError::NotFound(_) => {
}
AssetReaderError::Io(err) => {
error!(
"Path '{}' was removed, but the destination reader could not determine if it \
was a folder or a file due to the following error: {err}",
AssetPath::from_path(&path).with_source(source.id())
);
}
AssetReaderError::HttpError(status) => {
error!(
"Path '{}' was removed, but the destination reader could not determine if it \
was a folder or a file due to receiving an unexpected HTTP Status {status}",
AssetPath::from_path(&path).with_source(source.id())
);
}
}
}
}
}
}
}
async fn handle_added_folder(&self, source: &AssetSource, path: PathBuf) {
debug!(
"Folder {} was added. Attempting to re-process",
AssetPath::from_path(&path).with_source(source.id())
);
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
error!("AddFolder event cannot be handled in single threaded mode (or Wasm) yet.");
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
IoTaskPool::get().scope(|scope| {
scope.spawn(async move {
self.process_assets_internal(scope, source, path)
.await
.unwrap();
});
});
}
async fn handle_removed_meta(&self, source: &AssetSource, path: PathBuf) {
debug!(
"Meta for asset {} was removed. Attempting to re-process",
AssetPath::from_path(&path).with_source(source.id())
);
self.process_asset(source, path).await;
}
async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
debug!(
"Removing folder {} because source was removed",
path.display()
);
let processed_reader = source.processed_reader().unwrap();
match processed_reader.read_directory(path).await {
Ok(mut path_stream) => {
while let Some(child_path) = path_stream.next().await {
self.handle_removed_asset(source, child_path).await;
}
}
Err(err) => match err {
AssetReaderError::NotFound(_err) => {
}
AssetReaderError::HttpError(status) => {
self.log_unrecoverable().await;
error!(
"Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
);
}
AssetReaderError::Io(err) => {
self.log_unrecoverable().await;
error!(
"Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
);
}
},
}
let processed_writer = source.processed_writer().unwrap();
if let Err(err) = processed_writer.remove_directory(path).await {
match err {
AssetWriterError::Io(err) => {
if err.kind() != ErrorKind::NotFound {
let asset_path = AssetPath::from_path(path).with_source(source.id());
error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
}
}
}
}
}
async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
let asset_path = AssetPath::from(path).with_source(source.id());
debug!("Removing processed {asset_path} because source was removed");
let mut infos = self.data.asset_infos.write().await;
if let Some(info) = infos.get(&asset_path) {
let _write_lock = info.file_transaction_lock.write();
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}
infos.remove(&asset_path).await;
}
async fn handle_renamed_asset(&self, source: &AssetSource, old: PathBuf, new: PathBuf) {
let mut infos = self.data.asset_infos.write().await;
let old = AssetPath::from(old).with_source(source.id());
let new = AssetPath::from(new).with_source(source.id());
let processed_writer = source.processed_writer().unwrap();
if let Some(info) = infos.get(&old) {
let _write_lock = info.file_transaction_lock.write();
processed_writer
.rename(old.path(), new.path())
.await
.unwrap();
processed_writer
.rename_meta(old.path(), new.path())
.await
.unwrap();
}
infos.rename(&old, &new).await;
}
async fn finish_processing_assets(&self) {
self.try_reprocessing_queued().await;
self.server.data.infos.write().consume_handle_drop_events();
self.set_state(ProcessorState::Finished).await;
}
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
async fn process_assets_internal<'scope>(
&'scope self,
scope: &'scope bevy_tasks::Scope<'scope, '_, ()>,
source: &'scope AssetSource,
path: PathBuf,
) -> Result<(), AssetReaderError> {
if source.reader().is_directory(&path).await? {
let mut path_stream = source.reader().read_directory(&path).await?;
while let Some(path) = path_stream.next().await {
Box::pin(self.process_assets_internal(scope, source, path)).await?;
}
} else {
let processor = self.clone();
scope.spawn(async move {
processor.process_asset(source, path).await;
});
}
Ok(())
}
async fn try_reprocessing_queued(&self) {
loop {
let mut check_reprocess_queue =
core::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue);
IoTaskPool::get().scope(|scope| {
for path in check_reprocess_queue.drain(..) {
let processor = self.clone();
let source = self.get_source(path.source()).unwrap();
scope.spawn(async move {
processor.process_asset(source, path.into()).await;
});
}
});
let infos = self.data.asset_infos.read().await;
if infos.check_reprocess_queue.is_empty() {
break;
}
}
}
pub fn register_processor<P: Process>(&self, processor: P) {
let mut process_plans = self.data.processors.write();
#[cfg(feature = "trace")]
let processor = InstrumentedAssetProcessor(processor);
process_plans.insert(core::any::type_name::<P>(), Arc::new(processor));
}
pub fn set_default_processor<P: Process>(&self, extension: &str) {
let mut default_processors = self.data.default_processors.write();
default_processors.insert(extension.into(), core::any::type_name::<P>());
}
pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
let default_processors = self.data.default_processors.read();
let key = default_processors.get(extension)?;
self.data.processors.read().get(key).cloned()
}
pub fn get_processor(&self, processor_type_name: &str) -> Option<Arc<dyn ErasedProcessor>> {
let processors = self.data.processors.read();
processors.get(processor_type_name).cloned()
}
#[cfg_attr(
any(target_arch = "wasm32", not(feature = "multi_threaded")),
expect(
dead_code,
reason = "This function is only used when the `multi_threaded` feature is enabled, and when not on WASM."
)
)]
async fn initialize(&self) -> Result<(), InitializeError> {
self.validate_transaction_log_and_recover().await;
let mut asset_infos = self.data.asset_infos.write().await;
async fn get_asset_paths(
reader: &dyn ErasedAssetReader,
clean_empty_folders_writer: Option<&dyn ErasedAssetWriter>,
path: PathBuf,
paths: &mut Vec<PathBuf>,
) -> Result<bool, AssetReaderError> {
if reader.is_directory(&path).await? {
let mut path_stream = reader.read_directory(&path).await?;
let mut contains_files = false;
while let Some(child_path) = path_stream.next().await {
contains_files |= Box::pin(get_asset_paths(
reader,
clean_empty_folders_writer,
child_path,
paths,
))
.await?;
}
if !contains_files
&& path.parent().is_some()
&& let Some(writer) = clean_empty_folders_writer
{
let _ = writer.remove_empty_directory(&path).await;
}
Ok(contains_files)
} else {
paths.push(path);
Ok(true)
}
}
for source in self.sources().iter_processed() {
let Ok(processed_reader) = source.processed_reader() else {
continue;
};
let Ok(processed_writer) = source.processed_writer() else {
continue;
};
let mut unprocessed_paths = Vec::new();
get_asset_paths(
source.reader(),
None,
PathBuf::from(""),
&mut unprocessed_paths,
)
.await
.map_err(InitializeError::FailedToReadSourcePaths)?;
let mut processed_paths = Vec::new();
get_asset_paths(
processed_reader,
Some(processed_writer),
PathBuf::from(""),
&mut processed_paths,
)
.await
.map_err(InitializeError::FailedToReadDestinationPaths)?;
for path in unprocessed_paths {
asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
}
for path in processed_paths {
let mut dependencies = Vec::new();
let asset_path = AssetPath::from(path).with_source(source.id());
if let Some(info) = asset_infos.get_mut(&asset_path) {
match processed_reader.read_meta_bytes(asset_path.path()).await {
Ok(meta_bytes) => {
match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
Ok(minimal) => {
trace!(
"Populated processed info for asset {asset_path} {:?}",
minimal.processed_info
);
if let Some(processed_info) = &minimal.processed_info {
for process_dependency_info in
&processed_info.process_dependencies
{
dependencies.push(process_dependency_info.path.clone());
}
}
info.processed_info = minimal.processed_info;
}
Err(err) => {
trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}
}
}
Err(err) => {
trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}
}
} else {
trace!("Removing processed data for non-existent asset {asset_path}");
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}
for dependency in dependencies {
asset_infos.add_dependent(&dependency, asset_path.clone());
}
}
}
self.set_state(ProcessorState::Processing).await;
Ok(())
}
async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
if let Err(err) = source.processed_writer().unwrap().remove(path).await {
warn!("Failed to remove non-existent asset {path:?}: {err}");
}
if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
warn!("Failed to remove non-existent meta {path:?}: {err}");
}
self.clean_empty_processed_ancestor_folders(source, path)
.await;
}
async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
if path.is_absolute() {
error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
return;
}
while let Some(parent) = path.parent() {
if parent == Path::new("") {
break;
}
if source
.processed_writer()
.unwrap()
.remove_empty_directory(parent)
.await
.is_err()
{
break;
}
}
}
async fn process_asset(&self, source: &AssetSource, path: PathBuf) {
let asset_path = AssetPath::from(path).with_source(source.id());
let result = self.process_asset_internal(source, &asset_path).await;
let mut infos = self.data.asset_infos.write().await;
infos.finish_processing(asset_path, result).await;
}
async fn process_asset_internal(
&self,
source: &AssetSource,
asset_path: &AssetPath<'static>,
) -> Result<ProcessResult, ProcessError> {
debug!("Processing {}", asset_path);
let server = &self.server;
let path = asset_path.path();
let reader = source.reader();
let reader_err = |err| ProcessError::AssetReaderError {
path: asset_path.clone(),
err,
};
let writer_err = |err| ProcessError::AssetWriterError {
path: asset_path.clone(),
err,
};
let mut byte_reader = reader.read(path).await.map_err(reader_err)?;
let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
Ok(meta_bytes) => {
let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
})?;
let (meta, processor) = match minimal.asset {
AssetActionMinimal::Load { loader } => {
let loader = server.get_asset_loader_with_type_name(&loader).await?;
let meta = loader.deserialize_meta(&meta_bytes)?;
(meta, None)
}
AssetActionMinimal::Process { processor } => {
let processor = self
.get_processor(&processor)
.ok_or_else(|| ProcessError::MissingProcessor(processor))?;
let meta = processor.deserialize_meta(&meta_bytes)?;
(meta, Some(processor))
}
AssetActionMinimal::Ignore => {
return Ok(ProcessResult::Ignored);
}
};
(meta, meta_bytes, processor)
}
Err(AssetReaderError::NotFound(_path)) => {
let (meta, processor) = if let Some(processor) = asset_path
.get_full_extension()
.and_then(|ext| self.get_default_processor(&ext))
{
let meta = processor.default_meta();
(meta, Some(processor))
} else {
match server.get_path_asset_loader(asset_path.clone()).await {
Ok(loader) => (loader.default_meta(), None),
Err(MissingAssetLoaderForExtensionError { .. }) => {
let meta: Box<dyn AssetMetaDyn> =
Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
(meta, None)
}
}
};
let meta_bytes = meta.serialize();
(meta, meta_bytes, processor)
}
Err(err) => {
return Err(ProcessError::ReadAssetMetaError {
path: asset_path.clone(),
err,
})
}
};
let processed_writer = source.processed_writer()?;
let mut asset_bytes = Vec::new();
byte_reader
.read_to_end(&mut asset_bytes)
.await
.map_err(|e| ProcessError::AssetReaderError {
path: asset_path.clone(),
err: AssetReaderError::Io(e.into()),
})?;
let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
let mut new_processed_info = ProcessedInfo {
hash: new_hash,
full_hash: new_hash,
process_dependencies: Vec::new(),
};
{
let infos = self.data.asset_infos.read().await;
if let Some(current_processed_info) = infos
.get(asset_path)
.and_then(|i| i.processed_info.as_ref())
&& current_processed_info.hash == new_hash
{
let mut dependency_changed = false;
for current_dep_info in ¤t_processed_info.process_dependencies {
let live_hash = infos
.get(¤t_dep_info.path)
.and_then(|i| i.processed_info.as_ref())
.map(|i| i.full_hash);
if live_hash != Some(current_dep_info.full_hash) {
dependency_changed = true;
break;
}
}
if !dependency_changed {
return Ok(ProcessResult::SkippedNotChanged);
}
}
}
let _transaction_lock = {
let mut infos = self.data.asset_infos.write().await;
let info = infos.get_or_insert(asset_path.clone());
info.file_transaction_lock.write_arc().await
};
self.log_begin_processing(asset_path).await;
if let Some(processor) = processor {
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
let mut processed_meta = {
let mut context =
ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
processor
.process(&mut context, source_meta, &mut *writer)
.await?
};
writer
.flush()
.await
.map_err(|e| ProcessError::AssetWriterError {
path: asset_path.clone(),
err: AssetWriterError::Io(e),
})?;
let full_hash = get_full_asset_hash(
new_hash,
new_processed_info
.process_dependencies
.iter()
.map(|i| i.full_hash),
);
new_processed_info.full_hash = full_hash;
*processed_meta.processed_info_mut() = Some(new_processed_info.clone());
let meta_bytes = processed_meta.serialize();
processed_writer
.write_meta_bytes(path, &meta_bytes)
.await
.map_err(writer_err)?;
} else {
processed_writer
.write_bytes(path, &asset_bytes)
.await
.map_err(writer_err)?;
*source_meta.processed_info_mut() = Some(new_processed_info.clone());
let meta_bytes = source_meta.serialize();
processed_writer
.write_meta_bytes(path, &meta_bytes)
.await
.map_err(writer_err)?;
}
self.log_end_processing(asset_path).await;
Ok(ProcessResult::Processed(new_processed_info))
}
async fn validate_transaction_log_and_recover(&self) {
if let Err(err) = ProcessorTransactionLog::validate().await {
let state_is_valid = match err {
ValidateLogError::ReadLogError(err) => {
error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
false
}
ValidateLogError::UnrecoverableError => {
error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
false
}
ValidateLogError::EntryErrors(entry_errors) => {
let mut state_is_valid = true;
for entry_error in entry_errors {
match entry_error {
LogEntryError::DuplicateTransaction(_)
| LogEntryError::EndedMissingTransaction(_) => {
error!("{}", entry_error);
state_is_valid = false;
break;
}
LogEntryError::UnfinishedTransaction(path) => {
debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
error!("Failed to remove asset {path:?}: {message}");
state_is_valid = false;
};
let Ok(source) = self.get_source(path.source()) else {
unrecoverable_err(&"AssetSource does not exist");
continue;
};
let Ok(processed_writer) = source.processed_writer() else {
unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
continue;
};
if let Err(err) = processed_writer.remove(path.path()).await {
match err {
AssetWriterError::Io(err) => {
if err.kind() != ErrorKind::NotFound {
unrecoverable_err(&err);
}
}
}
}
if let Err(err) = processed_writer.remove_meta(path.path()).await {
match err {
AssetWriterError::Io(err) => {
if err.kind() != ErrorKind::NotFound {
unrecoverable_err(&err);
}
}
}
}
}
}
}
state_is_valid
}
};
if !state_is_valid {
error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
for source in self.sources().iter_processed() {
let Ok(processed_writer) = source.processed_writer() else {
continue;
};
if let Err(err) = processed_writer
.remove_assets_in_directory(Path::new(""))
.await
{
panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
}
}
}
}
let mut log = self.data.log.write().await;
*log = match ProcessorTransactionLog::new().await {
Ok(log) => Some(log),
Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
};
}
}
impl AssetProcessorData {
pub fn new(source: AssetSources) -> Self {
let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
finished_sender.set_overflow(true);
initialized_sender.set_overflow(true);
AssetProcessorData {
sources: source,
finished_sender,
finished_receiver,
initialized_sender,
initialized_receiver,
state: async_lock::RwLock::new(ProcessorState::Initializing),
log: Default::default(),
processors: Default::default(),
asset_infos: Default::default(),
default_processors: Default::default(),
}
}
pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
self.wait_until_initialized().await;
let mut receiver = {
let infos = self.asset_infos.write().await;
let info = infos.get(&path);
match info {
Some(info) => match info.status {
Some(result) => return result,
None => info.status_receiver.clone(),
},
None => return ProcessStatus::NonExistent,
}
};
receiver.recv().await.unwrap()
}
pub async fn wait_until_initialized(&self) {
let receiver = {
let state = self.state.read().await;
match *state {
ProcessorState::Initializing => {
Some(self.initialized_receiver.clone())
}
_ => None,
}
};
if let Some(mut receiver) = receiver {
receiver.recv().await.unwrap();
}
}
pub async fn wait_until_finished(&self) {
let receiver = {
let state = self.state.read().await;
match *state {
ProcessorState::Initializing | ProcessorState::Processing => {
Some(self.finished_receiver.clone())
}
ProcessorState::Finished => None,
}
};
if let Some(mut receiver) = receiver {
receiver.recv().await.unwrap();
}
}
}
#[cfg(feature = "trace")]
struct InstrumentedAssetProcessor<T>(T);
#[cfg(feature = "trace")]
impl<T: Process> Process for InstrumentedAssetProcessor<T> {
type Settings = T::Settings;
type OutputLoader = T::OutputLoader;
fn process(
&self,
context: &mut ProcessContext,
meta: AssetMeta<(), Self>,
writer: &mut crate::io::Writer,
) -> impl ConditionalSendFuture<
Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
> {
let meta = AssetMeta {
meta_format_version: meta.meta_format_version,
processed_info: meta.processed_info,
asset: meta.asset,
};
let span = info_span!(
"asset processing",
processor = core::any::type_name::<T>(),
asset = context.path().to_string(),
);
self.0.process(context, meta, writer).instrument(span)
}
}
#[derive(Debug, Clone)]
pub enum ProcessResult {
Processed(ProcessedInfo),
SkippedNotChanged,
Ignored,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum ProcessStatus {
Processed,
Failed,
NonExistent,
}
#[derive(Debug)]
pub(crate) struct ProcessorAssetInfo {
processed_info: Option<ProcessedInfo>,
dependents: HashSet<AssetPath<'static>>,
status: Option<ProcessStatus>,
pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
status_sender: async_broadcast::Sender<ProcessStatus>,
status_receiver: async_broadcast::Receiver<ProcessStatus>,
}
impl Default for ProcessorAssetInfo {
fn default() -> Self {
let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
status_sender.set_overflow(true);
Self {
processed_info: Default::default(),
dependents: Default::default(),
file_transaction_lock: Default::default(),
status: None,
status_sender,
status_receiver,
}
}
}
impl ProcessorAssetInfo {
async fn update_status(&mut self, status: ProcessStatus) {
if self.status != Some(status) {
self.status = Some(status);
self.status_sender.broadcast(status).await.unwrap();
}
}
}
#[derive(Default, Debug)]
pub struct ProcessorAssetInfos {
infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
check_reprocess_queue: VecDeque<AssetPath<'static>>,
}
impl ProcessorAssetInfos {
fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
self.infos.entry(asset_path.clone()).or_insert_with(|| {
let mut info = ProcessorAssetInfo::default();
if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
info.dependents = dependents;
}
info
})
}
pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
self.infos.get(asset_path)
}
fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
self.infos.get_mut(asset_path)
}
fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
if let Some(info) = self.get_mut(asset_path) {
info.dependents.insert(dependent);
} else {
let dependents = self
.non_existent_dependents
.entry(asset_path.clone())
.or_default();
dependents.insert(dependent);
}
}
async fn finish_processing(
&mut self,
asset_path: AssetPath<'static>,
result: Result<ProcessResult, ProcessError>,
) {
match result {
Ok(ProcessResult::Processed(processed_info)) => {
debug!("Finished processing \"{}\"", asset_path);
let old_processed_info = self
.infos
.get_mut(&asset_path)
.and_then(|i| i.processed_info.take());
if let Some(old_processed_info) = old_processed_info {
self.clear_dependencies(&asset_path, old_processed_info);
}
for process_dependency_info in &processed_info.process_dependencies {
self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
}
let info = self.get_or_insert(asset_path);
info.processed_info = Some(processed_info);
info.update_status(ProcessStatus::Processed).await;
let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
for path in dependents {
self.check_reprocess_queue.push_back(path);
}
}
Ok(ProcessResult::SkippedNotChanged) => {
debug!("Skipping processing (unchanged) \"{}\"", asset_path);
let info = self.get_mut(&asset_path).expect("info should exist");
info.update_status(ProcessStatus::Processed).await;
}
Ok(ProcessResult::Ignored) => {
debug!("Skipping processing (ignored) \"{}\"", asset_path);
}
Err(ProcessError::ExtensionRequired) => {
}
Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
trace!("No loader found for {asset_path}");
}
Err(ProcessError::AssetReaderError {
err: AssetReaderError::NotFound(_),
..
}) => {
trace!("No need to process asset {asset_path} because it does not exist");
}
Err(err) => {
error!("Failed to process asset {asset_path}: {err}");
if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
err
{
let info = self.get_mut(&asset_path).expect("info should exist");
info.processed_info = Some(ProcessedInfo {
hash: AssetHash::default(),
full_hash: AssetHash::default(),
process_dependencies: vec![],
});
self.add_dependent(dependency.path(), asset_path.to_owned());
}
let info = self.get_mut(&asset_path).expect("info should exist");
info.update_status(ProcessStatus::Failed).await;
}
}
}
async fn remove(&mut self, asset_path: &AssetPath<'static>) {
let info = self.infos.remove(asset_path);
if let Some(info) = info {
if let Some(processed_info) = info.processed_info {
self.clear_dependencies(asset_path, processed_info);
}
info.status_sender
.broadcast(ProcessStatus::NonExistent)
.await
.unwrap();
if !info.dependents.is_empty() {
error!(
"The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
info.dependents
);
self.non_existent_dependents
.insert(asset_path.clone(), info.dependents);
}
}
}
async fn rename(&mut self, old: &AssetPath<'static>, new: &AssetPath<'static>) {
let info = self.infos.remove(old);
if let Some(mut info) = info {
if !info.dependents.is_empty() {
error!(
"The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
info.dependents
);
self.non_existent_dependents
.insert(old.clone(), core::mem::take(&mut info.dependents));
}
if let Some(processed_info) = &info.processed_info {
for dep in &processed_info.process_dependencies {
if let Some(info) = self.infos.get_mut(&dep.path) {
info.dependents.remove(old);
info.dependents.insert(new.clone());
} else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path)
{
dependents.remove(old);
dependents.insert(new.clone());
}
}
}
info.status_sender
.broadcast(ProcessStatus::NonExistent)
.await
.unwrap();
let dependents: Vec<AssetPath<'static>> = {
let new_info = self.get_or_insert(new.clone());
new_info.processed_info = info.processed_info;
new_info.status = info.status;
if let Some(status) = new_info.status {
new_info.status_sender.broadcast(status).await.unwrap();
}
new_info.dependents.iter().cloned().collect()
};
self.check_reprocess_queue.push_back(new.clone());
for dependent in dependents {
self.check_reprocess_queue.push_back(dependent);
}
}
}
fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
for old_load_dep in removed_info.process_dependencies {
if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
info.dependents.remove(asset_path);
} else if let Some(dependents) =
self.non_existent_dependents.get_mut(&old_load_dep.path)
{
dependents.remove(asset_path);
}
}
}
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum ProcessorState {
Initializing,
Processing,
Finished,
}
#[derive(Error, Debug)]
pub enum InitializeError {
#[error(transparent)]
FailedToReadSourcePaths(AssetReaderError),
#[error(transparent)]
FailedToReadDestinationPaths(AssetReaderError),
#[error("Failed to validate asset log: {0}")]
ValidateLogError(#[from] ValidateLogError),
}