mod log;
mod process;
use async_lock::RwLockReadGuardArc;
pub use log::*;
pub use process::*;
use crate::{
io::{
AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
AssetSources, AssetWriterError, ErasedAssetReader, MissingAssetSourceError,
},
meta::{
get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
},
AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
};
use alloc::{borrow::ToOwned, boxed::Box, string::String, sync::Arc, vec, vec::Vec};
use bevy_ecs::prelude::*;
use bevy_platform::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::{PoisonError, RwLock},
};
use bevy_tasks::IoTaskPool;
use futures_io::ErrorKind;
use futures_lite::{AsyncWriteExt, StreamExt};
use futures_util::{select_biased, FutureExt};
use std::{
path::{Path, PathBuf},
sync::Mutex,
};
use thiserror::Error;
use tracing::{debug, error, trace, warn};
#[cfg(feature = "trace")]
use {
alloc::string::ToString,
tracing::{info_span, instrument::Instrument},
};
#[derive(Resource, Clone)]
pub struct AssetProcessor {
server: AssetServer,
pub(crate) data: Arc<AssetProcessorData>,
}
pub struct AssetProcessorData {
pub(crate) processing_state: Arc<ProcessingState>,
log_factory: Mutex<Option<Box<dyn ProcessorTransactionLogFactory>>>,
log: async_lock::RwLock<Option<Box<dyn ProcessorTransactionLog>>>,
processors: RwLock<Processors>,
sources: Arc<AssetSources>,
}
pub(crate) struct ProcessingState {
state: async_lock::RwLock<ProcessorState>,
initialized_sender: async_broadcast::Sender<()>,
initialized_receiver: async_broadcast::Receiver<()>,
finished_sender: async_broadcast::Sender<()>,
finished_receiver: async_broadcast::Receiver<()>,
asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
}
#[derive(Default)]
struct Processors {
type_path_to_processor: HashMap<&'static str, Arc<dyn ErasedProcessor>>,
short_type_path_to_processor: HashMap<&'static str, ShortTypeProcessorEntry>,
file_extension_to_default_processor: HashMap<Box<str>, &'static str>,
}
enum ShortTypeProcessorEntry {
Unique {
type_path: &'static str,
processor: Arc<dyn ErasedProcessor>,
},
Ambiguous(Vec<&'static str>),
}
impl AssetProcessor {
pub fn new(
sources: &mut AssetSourceBuilders,
watch_processed: bool,
) -> (Self, Arc<AssetSources>) {
let state = Arc::new(ProcessingState::new());
let mut sources = sources.build_sources(true, watch_processed);
sources.gate_on_processor(state.clone());
let sources = Arc::new(sources);
let data = Arc::new(AssetProcessorData::new(sources.clone(), state));
let server = AssetServer::new_with_meta_check(
sources.clone(),
AssetServerMode::Processed,
AssetMetaCheck::Always,
false,
UnapprovedPathMode::default(),
);
(Self { server, data }, sources)
}
pub fn data(&self) -> &Arc<AssetProcessorData> {
&self.data
}
pub fn server(&self) -> &AssetServer {
&self.server
}
pub async fn get_state(&self) -> ProcessorState {
self.data.processing_state.get_state().await
}
#[inline]
pub fn get_source<'a>(
&self,
id: impl Into<AssetSourceId<'a>>,
) -> Result<&AssetSource, MissingAssetSourceError> {
self.data.sources.get(id.into())
}
#[inline]
pub fn sources(&self) -> &AssetSources {
&self.data.sources
}
async fn log_unrecoverable(&self) {
let mut log = self.data.log.write().await;
let log = log.as_mut().unwrap();
log.unrecoverable()
.await
.map_err(|error| WriteLogError {
log_entry: LogEntry::UnrecoverableError,
error,
})
.unwrap();
}
async fn log_begin_processing(&self, path: &AssetPath<'_>) {
let mut log = self.data.log.write().await;
let log = log.as_mut().unwrap();
log.begin_processing(path)
.await
.map_err(|error| WriteLogError {
log_entry: LogEntry::BeginProcessing(path.clone_owned()),
error,
})
.unwrap();
}
async fn log_end_processing(&self, path: &AssetPath<'_>) {
let mut log = self.data.log.write().await;
let log = log.as_mut().unwrap();
log.end_processing(path)
.await
.map_err(|error| WriteLogError {
log_entry: LogEntry::EndProcessing(path.clone_owned()),
error,
})
.unwrap();
}
pub fn start(processor: Res<Self>) {
let processor = processor.clone();
IoTaskPool::get()
.spawn(async move {
let start_time = std::time::Instant::now();
debug!("Processing Assets");
processor.initialize().await.unwrap();
let (new_task_sender, new_task_receiver) = async_channel::unbounded();
processor
.queue_initial_processing_tasks(&new_task_sender)
.await;
{
let processor = processor.clone();
let new_task_sender = new_task_sender.clone();
IoTaskPool::get()
.spawn(async move {
processor
.execute_processing_tasks(new_task_sender, new_task_receiver)
.await;
})
.detach();
}
processor.data.wait_until_finished().await;
let end_time = std::time::Instant::now();
debug!("Processing finished in {:?}", end_time - start_time);
debug!("Listening for changes to source assets");
processor.spawn_source_change_event_listeners(&new_task_sender);
})
.detach();
}
async fn queue_initial_processing_tasks(
&self,
sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
for source in self.sources().iter_processed() {
self.queue_processing_tasks_for_folder(source, PathBuf::from(""), sender)
.await
.unwrap();
}
}
fn spawn_source_change_event_listeners(
&self,
sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
for source in self.data.sources.iter_processed() {
let Some(receiver) = source.event_receiver().cloned() else {
continue;
};
let source_id = source.id();
let processor = self.clone();
let sender = sender.clone();
IoTaskPool::get()
.spawn(async move {
while let Ok(event) = receiver.recv().await {
let Ok(source) = processor.get_source(source_id.clone()) else {
return;
};
processor
.handle_asset_source_event(source, event, &sender)
.await;
}
})
.detach();
}
}
async fn execute_processing_tasks(
&self,
new_task_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
new_task_receiver: async_channel::Receiver<(AssetSourceId<'static>, PathBuf)>,
) {
let new_task_sender = {
let weak_sender = new_task_sender.downgrade();
drop(new_task_sender);
weak_sender
};
if new_task_receiver.is_empty() {
self.data
.processing_state
.set_state(ProcessorState::Finished)
.await;
}
enum ProcessorTaskEvent {
Start(AssetSourceId<'static>, PathBuf),
Finished,
}
let (task_finished_sender, task_finished_receiver) = async_channel::unbounded::<()>();
let mut pending_tasks = 0;
while let Ok(event) = {
select_biased! {
result = new_task_receiver.recv().fuse() => {
result.map(|(source_id, path)| ProcessorTaskEvent::Start(source_id, path))
},
result = task_finished_receiver.recv().fuse() => {
result.map(|()| ProcessorTaskEvent::Finished)
}
}
} {
match event {
ProcessorTaskEvent::Start(source_id, path) => {
let Some(new_task_sender) = new_task_sender.upgrade() else {
continue;
};
let processor = self.clone();
let task_finished_sender = task_finished_sender.clone();
pending_tasks += 1;
IoTaskPool::get()
.spawn(async move {
let Ok(source) = processor.get_source(source_id) else {
return;
};
processor.process_asset(source, path, new_task_sender).await;
let _ = task_finished_sender.send(()).await;
})
.detach();
self.data
.processing_state
.set_state(ProcessorState::Processing)
.await;
}
ProcessorTaskEvent::Finished => {
pending_tasks -= 1;
if pending_tasks == 0 {
self.server.write_infos().consume_handle_drop_events();
self.data
.processing_state
.set_state(ProcessorState::Finished)
.await;
}
}
}
}
}
pub async fn write_default_meta_file_for_path(
&self,
path: impl Into<AssetPath<'_>>,
) -> Result<(), WriteDefaultMetaError> {
let path = path.into();
let Some(processor) = path
.get_full_extension()
.and_then(|extension| self.get_default_processor(&extension))
else {
return self
.server
.write_default_loader_meta_file_for_path(path)
.await;
};
let meta = processor.default_meta();
let serialized_meta = meta.serialize();
let source = self.get_source(path.source())?;
let reader = source.reader();
match reader.read_meta_bytes(path.path()).await {
Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
Err(AssetReaderError::NotFound(_)) => {
}
Err(AssetReaderError::Io(err)) => {
return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
}
Err(AssetReaderError::HttpError(err)) => {
return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
}
}
let writer = source.writer()?;
writer
.write_meta_bytes(path.path(), &serialized_meta)
.await?;
Ok(())
}
async fn handle_asset_source_event(
&self,
source: &AssetSource,
event: AssetSourceEvent,
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
trace!("{event:?}");
match event {
AssetSourceEvent::AddedAsset(path)
| AssetSourceEvent::AddedMeta(path)
| AssetSourceEvent::ModifiedAsset(path)
| AssetSourceEvent::ModifiedMeta(path) => {
let _ = new_task_sender.send((source.id(), path)).await;
}
AssetSourceEvent::RemovedAsset(path) => {
self.handle_removed_asset(source, path).await;
}
AssetSourceEvent::RemovedMeta(path) => {
self.handle_removed_meta(source, path, new_task_sender)
.await;
}
AssetSourceEvent::AddedFolder(path) => {
self.handle_added_folder(source, path, new_task_sender)
.await;
}
AssetSourceEvent::RemovedFolder(path) => {
self.handle_removed_folder(source, &path).await;
}
AssetSourceEvent::RenamedAsset { old, new } => {
if old == new {
let _ = new_task_sender.send((source.id(), new)).await;
} else {
self.handle_renamed_asset(source, old, new, new_task_sender)
.await;
}
}
AssetSourceEvent::RenamedMeta { old, new } => {
if old == new {
let _ = new_task_sender.send((source.id(), new)).await;
} else {
debug!("Meta renamed from {old:?} to {new:?}");
let _ = new_task_sender.send((source.id(), old)).await;
let _ = new_task_sender.send((source.id(), new)).await;
}
}
AssetSourceEvent::RenamedFolder { old, new } => {
if old == new {
self.handle_added_folder(source, new, new_task_sender).await;
} else {
self.handle_removed_folder(source, &old).await;
self.handle_added_folder(source, new, new_task_sender).await;
}
}
AssetSourceEvent::RemovedUnknown { path, is_meta } => {
let processed_reader = source.ungated_processed_reader().unwrap();
match processed_reader.is_directory(&path).await {
Ok(is_directory) => {
if is_directory {
self.handle_removed_folder(source, &path).await;
} else if is_meta {
self.handle_removed_meta(source, path, new_task_sender)
.await;
} else {
self.handle_removed_asset(source, path).await;
}
}
Err(err) => {
match err {
AssetReaderError::NotFound(_) => {
}
AssetReaderError::Io(err) => {
error!(
"Path '{}' was removed, but the destination reader could not determine if it \
was a folder or a file due to the following error: {err}",
AssetPath::from_path(&path).with_source(source.id())
);
}
AssetReaderError::HttpError(status) => {
error!(
"Path '{}' was removed, but the destination reader could not determine if it \
was a folder or a file due to receiving an unexpected HTTP Status {status}",
AssetPath::from_path(&path).with_source(source.id())
);
}
}
}
}
}
}
}
async fn handle_added_folder(
&self,
source: &AssetSource,
path: PathBuf,
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
debug!(
"Folder {} was added. Attempting to re-process",
AssetPath::from_path(&path).with_source(source.id())
);
self.queue_processing_tasks_for_folder(source, path, new_task_sender)
.await
.unwrap();
}
async fn handle_removed_meta(
&self,
source: &AssetSource,
path: PathBuf,
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
debug!(
"Meta for asset {} was removed. Attempting to re-process",
AssetPath::from_path(&path).with_source(source.id())
);
let _ = new_task_sender.send((source.id(), path)).await;
}
async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
debug!(
"Removing folder {} because source was removed",
path.display()
);
let processed_reader = source.ungated_processed_reader().unwrap();
match processed_reader.read_directory(path).await {
Ok(mut path_stream) => {
while let Some(child_path) = path_stream.next().await {
self.handle_removed_asset(source, child_path).await;
}
}
Err(err) => match err {
AssetReaderError::NotFound(_err) => {
}
AssetReaderError::HttpError(status) => {
self.log_unrecoverable().await;
error!(
"Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
);
}
AssetReaderError::Io(err) => {
self.log_unrecoverable().await;
error!(
"Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
);
}
},
}
let processed_writer = source.processed_writer().unwrap();
if let Err(err) = processed_writer.remove_directory(path).await {
match err {
AssetWriterError::Io(err) => {
if err.kind() != ErrorKind::NotFound {
let asset_path = AssetPath::from_path(path).with_source(source.id());
error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
}
}
}
}
}
async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
let asset_path = AssetPath::from(path).with_source(source.id());
debug!("Removing processed {asset_path} because source was removed");
let lock = {
let mut infos = self.data.processing_state.asset_infos.write().await;
infos.remove(&asset_path).await
};
let Some(lock) = lock else {
return;
};
let _write_lock = lock.write();
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}
async fn handle_renamed_asset(
&self,
source: &AssetSource,
old: PathBuf,
new: PathBuf,
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
let old = AssetPath::from(old).with_source(source.id());
let new = AssetPath::from(new).with_source(source.id());
let processed_writer = source.processed_writer().unwrap();
let result = {
let mut infos = self.data.processing_state.asset_infos.write().await;
infos.rename(&old, &new, new_task_sender).await
};
let Some((old_lock, new_lock)) = result else {
return;
};
let _old_write_lock = old_lock.write();
let _new_write_lock = new_lock.write();
processed_writer
.rename(old.path(), new.path())
.await
.unwrap();
processed_writer
.rename_meta(old.path(), new.path())
.await
.unwrap();
}
async fn queue_processing_tasks_for_folder(
&self,
source: &AssetSource,
path: PathBuf,
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) -> Result<(), AssetReaderError> {
if source.reader().is_directory(&path).await? {
let mut path_stream = source.reader().read_directory(&path).await?;
while let Some(path) = path_stream.next().await {
Box::pin(self.queue_processing_tasks_for_folder(source, path, new_task_sender))
.await?;
}
} else {
let _ = new_task_sender.send((source.id(), path)).await;
}
Ok(())
}
pub fn register_processor<P: Process>(&self, processor: P) {
let mut processors = self
.data
.processors
.write()
.unwrap_or_else(PoisonError::into_inner);
let processor = Arc::new(processor);
processors
.type_path_to_processor
.insert(P::type_path(), processor.clone());
match processors
.short_type_path_to_processor
.entry(P::short_type_path())
{
Entry::Vacant(entry) => {
entry.insert(ShortTypeProcessorEntry::Unique {
type_path: P::type_path(),
processor,
});
}
Entry::Occupied(mut entry) => match entry.get_mut() {
ShortTypeProcessorEntry::Unique { type_path, .. } => {
let type_path = *type_path;
*entry.get_mut() =
ShortTypeProcessorEntry::Ambiguous(vec![type_path, P::type_path()]);
}
ShortTypeProcessorEntry::Ambiguous(type_paths) => {
type_paths.push(P::type_path());
}
},
}
}
pub fn set_default_processor<P: Process>(&self, extension: &str) {
let mut processors = self
.data
.processors
.write()
.unwrap_or_else(PoisonError::into_inner);
processors
.file_extension_to_default_processor
.insert(extension.into(), P::type_path());
}
pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
let processors = self
.data
.processors
.read()
.unwrap_or_else(PoisonError::into_inner);
let key = processors
.file_extension_to_default_processor
.get(extension)?;
processors.type_path_to_processor.get(key).cloned()
}
pub fn get_processor(
&self,
processor_type_name: &str,
) -> Result<Arc<dyn ErasedProcessor>, GetProcessorError> {
let processors = self
.data
.processors
.read()
.unwrap_or_else(PoisonError::into_inner);
if let Some(short_type_processor) = processors
.short_type_path_to_processor
.get(processor_type_name)
{
return match short_type_processor {
ShortTypeProcessorEntry::Unique { processor, .. } => Ok(processor.clone()),
ShortTypeProcessorEntry::Ambiguous(examples) => Err(GetProcessorError::Ambiguous {
processor_short_name: processor_type_name.to_owned(),
ambiguous_processor_names: examples.clone(),
}),
};
}
processors
.type_path_to_processor
.get(processor_type_name)
.cloned()
.ok_or_else(|| GetProcessorError::Missing(processor_type_name.to_owned()))
}
async fn initialize(&self) -> Result<(), InitializeError> {
self.validate_transaction_log_and_recover().await;
let mut asset_infos = self.data.processing_state.asset_infos.write().await;
async fn get_asset_paths(
reader: &dyn ErasedAssetReader,
path: PathBuf,
paths: &mut Vec<PathBuf>,
mut empty_dirs: Option<&mut Vec<PathBuf>>,
) -> Result<bool, AssetReaderError> {
if reader.is_directory(&path).await? {
let mut path_stream = reader.read_directory(&path).await?;
let mut contains_files = false;
while let Some(child_path) = path_stream.next().await {
contains_files |= Box::pin(get_asset_paths(
reader,
child_path,
paths,
empty_dirs.as_deref_mut(),
))
.await?;
}
if !contains_files
&& path.parent().is_some()
&& let Some(empty_dirs) = empty_dirs
{
empty_dirs.push(path);
}
Ok(contains_files)
} else {
paths.push(path);
Ok(true)
}
}
for source in self.sources().iter_processed() {
let Some(processed_reader) = source.ungated_processed_reader() else {
continue;
};
let Ok(processed_writer) = source.processed_writer() else {
continue;
};
let mut unprocessed_paths = Vec::new();
get_asset_paths(
source.reader(),
PathBuf::from(""),
&mut unprocessed_paths,
None,
)
.await
.map_err(InitializeError::FailedToReadSourcePaths)?;
let mut processed_paths = Vec::new();
let mut empty_dirs = Vec::new();
get_asset_paths(
processed_reader,
PathBuf::from(""),
&mut processed_paths,
Some(&mut empty_dirs),
)
.await
.map_err(InitializeError::FailedToReadDestinationPaths)?;
for empty_dir in empty_dirs {
let _ = processed_writer.remove_empty_directory(&empty_dir).await;
}
for path in unprocessed_paths {
asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
}
for path in processed_paths {
let mut dependencies = Vec::new();
let asset_path = AssetPath::from(path).with_source(source.id());
if let Some(info) = asset_infos.get_mut(&asset_path) {
match processed_reader.read_meta_bytes(asset_path.path()).await {
Ok(meta_bytes) => {
match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
Ok(minimal) => {
trace!(
"Populated processed info for asset {asset_path} {:?}",
minimal.processed_info
);
if let Some(processed_info) = &minimal.processed_info {
for process_dependency_info in
&processed_info.process_dependencies
{
dependencies.push(process_dependency_info.path.clone());
}
}
info.processed_info = minimal.processed_info;
}
Err(err) => {
trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}
}
}
Err(err) => {
trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}
}
} else {
trace!("Removing processed data for non-existent asset {asset_path}");
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}
for dependency in dependencies {
asset_infos.add_dependent(&dependency, asset_path.clone());
}
}
}
self.data
.processing_state
.set_state(ProcessorState::Processing)
.await;
Ok(())
}
async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
if let Err(err) = source.processed_writer().unwrap().remove(path).await {
warn!("Failed to remove non-existent asset {path:?}: {err}");
}
if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
warn!("Failed to remove non-existent meta {path:?}: {err}");
}
self.clean_empty_processed_ancestor_folders(source, path)
.await;
}
async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
if path.is_absolute() {
error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
return;
}
while let Some(parent) = path.parent() {
if parent == Path::new("") {
break;
}
if source
.processed_writer()
.unwrap()
.remove_empty_directory(parent)
.await
.is_err()
{
break;
}
}
}
async fn process_asset(
&self,
source: &AssetSource,
path: PathBuf,
processor_task_event: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
let asset_path = AssetPath::from(path).with_source(source.id());
let result = self.process_asset_internal(source, &asset_path).await;
let mut infos = self.data.processing_state.asset_infos.write().await;
infos
.finish_processing(asset_path, result, processor_task_event)
.await;
}
async fn process_asset_internal(
&self,
source: &AssetSource,
asset_path: &AssetPath<'static>,
) -> Result<ProcessResult, ProcessError> {
debug!("Processing {}", asset_path);
let server = &self.server;
let path = asset_path.path();
let reader = source.reader();
let reader_err = |err| ProcessError::AssetReaderError {
path: asset_path.clone(),
err,
};
let writer_err = |err| ProcessError::AssetWriterError {
path: asset_path.clone(),
err,
};
let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
Ok(meta_bytes) => {
let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
})?;
let (meta, processor) = match minimal.asset {
AssetActionMinimal::Load { loader } => {
let loader = server.get_asset_loader_with_type_name(&loader).await?;
let meta = loader.deserialize_meta(&meta_bytes)?;
(meta, None)
}
AssetActionMinimal::Process { processor } => {
let processor = self.get_processor(&processor)?;
let meta = processor.deserialize_meta(&meta_bytes)?;
(meta, Some(processor))
}
AssetActionMinimal::Ignore => {
return Ok(ProcessResult::Ignored);
}
};
(meta, meta_bytes, processor)
}
Err(AssetReaderError::NotFound(_path)) => {
let (meta, processor) = if let Some(processor) = asset_path
.get_full_extension()
.and_then(|ext| self.get_default_processor(&ext))
{
let meta = processor.default_meta();
(meta, Some(processor))
} else {
match server.get_path_asset_loader(asset_path.clone()).await {
Ok(loader) => (loader.default_meta(), None),
Err(MissingAssetLoaderForExtensionError { .. }) => {
let meta: Box<dyn AssetMetaDyn> =
Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
(meta, None)
}
}
};
let meta_bytes = meta.serialize();
(meta, meta_bytes, processor)
}
Err(err) => {
return Err(ProcessError::ReadAssetMetaError {
path: asset_path.clone(),
err,
})
}
};
let processed_writer = source.processed_writer()?;
let new_hash = {
let mut reader_for_hash = reader.read(path).await.map_err(reader_err)?;
get_asset_hash(&meta_bytes, &mut reader_for_hash)
.await
.map_err(reader_err)?
};
let mut new_processed_info = ProcessedInfo {
hash: new_hash,
full_hash: new_hash,
process_dependencies: Vec::new(),
};
{
let infos = self.data.processing_state.asset_infos.read().await;
if let Some(current_processed_info) = infos
.get(asset_path)
.and_then(|i| i.processed_info.as_ref())
&& current_processed_info.hash == new_hash
{
let mut dependency_changed = false;
for current_dep_info in ¤t_processed_info.process_dependencies {
let live_hash = infos
.get(¤t_dep_info.path)
.and_then(|i| i.processed_info.as_ref())
.map(|i| i.full_hash);
if live_hash != Some(current_dep_info.full_hash) {
dependency_changed = true;
break;
}
}
if !dependency_changed {
return Ok(ProcessResult::SkippedNotChanged);
}
}
}
let _transaction_lock = {
let lock = {
let mut infos = self.data.processing_state.asset_infos.write().await;
let info = infos.get_or_insert(asset_path.clone());
info.file_transaction_lock.clone()
};
lock.write_arc().await
};
self.log_begin_processing(asset_path).await;
if let Some(processor) = processor {
let settings = source_meta.process_settings().unwrap();
let reader_for_process = reader.read(path).await.map_err(reader_err)?;
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
let mut processed_meta = {
let mut context = ProcessContext::new(
self,
asset_path,
reader_for_process,
&mut new_processed_info,
);
let process = processor.process(&mut context, settings, &mut *writer);
#[cfg(feature = "trace")]
let process = {
let span = info_span!(
"asset processing",
processor = processor.type_path(),
asset = asset_path.to_string(),
);
process.instrument(span)
};
process.await?
};
writer
.flush()
.await
.map_err(|e| ProcessError::AssetWriterError {
path: asset_path.clone(),
err: AssetWriterError::Io(e),
})?;
let full_hash = get_full_asset_hash(
new_hash,
new_processed_info
.process_dependencies
.iter()
.map(|i| i.full_hash),
);
new_processed_info.full_hash = full_hash;
*processed_meta.processed_info_mut() = Some(new_processed_info.clone());
let meta_bytes = processed_meta.serialize();
processed_writer
.write_meta_bytes(path, &meta_bytes)
.await
.map_err(writer_err)?;
} else {
let mut reader_for_copy = reader.read(path).await.map_err(reader_err)?;
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
futures_lite::io::copy(&mut reader_for_copy, &mut writer)
.await
.map_err(|err| ProcessError::AssetWriterError {
path: asset_path.clone_owned(),
err: err.into(),
})?;
*source_meta.processed_info_mut() = Some(new_processed_info.clone());
let meta_bytes = source_meta.serialize();
processed_writer
.write_meta_bytes(path, &meta_bytes)
.await
.map_err(writer_err)?;
}
self.log_end_processing(asset_path).await;
Ok(ProcessResult::Processed(new_processed_info))
}
async fn validate_transaction_log_and_recover(&self) {
let log_factory = self
.data
.log_factory
.lock()
.unwrap_or_else(PoisonError::into_inner)
.take()
.expect("the asset processor only starts once");
if let Err(err) = validate_transaction_log(log_factory.as_ref()).await {
let state_is_valid = match err {
ValidateLogError::ReadLogError(err) => {
error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
false
}
ValidateLogError::UnrecoverableError => {
error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
false
}
ValidateLogError::EntryErrors(entry_errors) => {
let mut state_is_valid = true;
for entry_error in entry_errors {
match entry_error {
LogEntryError::DuplicateTransaction(_)
| LogEntryError::EndedMissingTransaction(_) => {
error!("{}", entry_error);
state_is_valid = false;
break;
}
LogEntryError::UnfinishedTransaction(path) => {
debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
error!("Failed to remove asset {path:?}: {message}");
state_is_valid = false;
};
let Ok(source) = self.get_source(path.source()) else {
unrecoverable_err(&"AssetSource does not exist");
continue;
};
let Ok(processed_writer) = source.processed_writer() else {
unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
continue;
};
if let Err(err) = processed_writer.remove(path.path()).await {
match err {
AssetWriterError::Io(err) => {
if err.kind() != ErrorKind::NotFound {
unrecoverable_err(&err);
}
}
}
}
if let Err(err) = processed_writer.remove_meta(path.path()).await {
match err {
AssetWriterError::Io(err) => {
if err.kind() != ErrorKind::NotFound {
unrecoverable_err(&err);
}
}
}
}
}
}
}
state_is_valid
}
};
if !state_is_valid {
error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
for source in self.sources().iter_processed() {
let Ok(processed_writer) = source.processed_writer() else {
continue;
};
if let Err(err) = processed_writer
.remove_assets_in_directory(Path::new(""))
.await
{
panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
}
}
}
}
let mut log = self.data.log.write().await;
*log = match log_factory.create_new_log().await {
Ok(log) => Some(log),
Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
};
}
}
impl AssetProcessorData {
pub(crate) fn new(sources: Arc<AssetSources>, processing_state: Arc<ProcessingState>) -> Self {
AssetProcessorData {
processing_state,
sources,
log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))),
log: Default::default(),
processors: Default::default(),
}
}
pub fn set_log_factory(
&self,
factory: Box<dyn ProcessorTransactionLogFactory>,
) -> Result<(), SetTransactionLogFactoryError> {
let mut log_factory = self
.log_factory
.lock()
.unwrap_or_else(PoisonError::into_inner);
if log_factory.is_none() {
return Err(SetTransactionLogFactoryError::AlreadyInUse);
}
*log_factory = Some(factory);
Ok(())
}
pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
self.processing_state.wait_until_processed(path).await
}
pub async fn wait_until_initialized(&self) {
self.processing_state.wait_until_initialized().await;
}
pub async fn wait_until_finished(&self) {
self.processing_state.wait_until_finished().await;
}
}
impl ProcessingState {
fn new() -> Self {
let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
initialized_sender.set_overflow(true);
finished_sender.set_overflow(true);
Self {
state: async_lock::RwLock::new(ProcessorState::Initializing),
initialized_sender,
initialized_receiver,
finished_sender,
finished_receiver,
asset_infos: Default::default(),
}
}
async fn set_state(&self, state: ProcessorState) {
let mut state_guard = self.state.write().await;
let last_state = *state_guard;
*state_guard = state;
if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
self.finished_sender.broadcast(()).await.unwrap();
} else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
self.initialized_sender.broadcast(()).await.unwrap();
}
}
pub(crate) async fn get_state(&self) -> ProcessorState {
*self.state.read().await
}
pub(crate) async fn get_transaction_lock(
&self,
path: &AssetPath<'static>,
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
let lock = {
let infos = self.asset_infos.read().await;
let info = infos
.get(path)
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
info.file_transaction_lock.clone()
};
Ok(lock.read_arc().await)
}
pub(crate) async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
self.wait_until_initialized().await;
let mut receiver = {
let infos = self.asset_infos.write().await;
let info = infos.get(&path);
match info {
Some(info) => match info.status {
Some(result) => return result,
None => info.status_receiver.clone(),
},
None => return ProcessStatus::NonExistent,
}
};
receiver.recv().await.unwrap()
}
pub(crate) async fn wait_until_initialized(&self) {
let receiver = {
let state = self.state.read().await;
match *state {
ProcessorState::Initializing => {
Some(self.initialized_receiver.clone())
}
_ => None,
}
};
if let Some(mut receiver) = receiver {
receiver.recv().await.unwrap();
}
}
pub(crate) async fn wait_until_finished(&self) {
let receiver = {
let state = self.state.read().await;
match *state {
ProcessorState::Initializing | ProcessorState::Processing => {
Some(self.finished_receiver.clone())
}
ProcessorState::Finished => None,
}
};
if let Some(mut receiver) = receiver {
receiver.recv().await.unwrap();
}
}
}
#[derive(Debug, Clone)]
pub enum ProcessResult {
Processed(ProcessedInfo),
SkippedNotChanged,
Ignored,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum ProcessStatus {
Processed,
Failed,
NonExistent,
}
#[derive(Debug)]
pub(crate) struct ProcessorAssetInfo {
processed_info: Option<ProcessedInfo>,
dependents: HashSet<AssetPath<'static>>,
status: Option<ProcessStatus>,
pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
status_sender: async_broadcast::Sender<ProcessStatus>,
status_receiver: async_broadcast::Receiver<ProcessStatus>,
}
impl Default for ProcessorAssetInfo {
fn default() -> Self {
let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
status_sender.set_overflow(true);
Self {
processed_info: Default::default(),
dependents: Default::default(),
file_transaction_lock: Default::default(),
status: None,
status_sender,
status_receiver,
}
}
}
impl ProcessorAssetInfo {
async fn update_status(&mut self, status: ProcessStatus) {
if self.status != Some(status) {
self.status = Some(status);
self.status_sender.broadcast(status).await.unwrap();
}
}
}
#[derive(Default, Debug)]
pub struct ProcessorAssetInfos {
infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
}
impl ProcessorAssetInfos {
fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
self.infos.entry(asset_path.clone()).or_insert_with(|| {
let mut info = ProcessorAssetInfo::default();
if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
info.dependents = dependents;
}
info
})
}
pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
self.infos.get(asset_path)
}
fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
self.infos.get_mut(asset_path)
}
fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
if let Some(info) = self.get_mut(asset_path) {
info.dependents.insert(dependent);
} else {
let dependents = self
.non_existent_dependents
.entry(asset_path.clone())
.or_default();
dependents.insert(dependent);
}
}
async fn finish_processing(
&mut self,
asset_path: AssetPath<'static>,
result: Result<ProcessResult, ProcessError>,
reprocess_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
match result {
Ok(ProcessResult::Processed(processed_info)) => {
debug!("Finished processing \"{}\"", asset_path);
let old_processed_info = self
.infos
.get_mut(&asset_path)
.and_then(|i| i.processed_info.take());
if let Some(old_processed_info) = old_processed_info {
self.clear_dependencies(&asset_path, old_processed_info);
}
for process_dependency_info in &processed_info.process_dependencies {
self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
}
let info = self.get_or_insert(asset_path);
info.processed_info = Some(processed_info);
info.update_status(ProcessStatus::Processed).await;
let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
for path in dependents {
let _ = reprocess_sender
.send((path.source().clone_owned(), path.path().to_owned()))
.await;
}
}
Ok(ProcessResult::SkippedNotChanged) => {
debug!("Skipping processing (unchanged) \"{}\"", asset_path);
let info = self.get_mut(&asset_path).expect("info should exist");
info.update_status(ProcessStatus::Processed).await;
}
Ok(ProcessResult::Ignored) => {
debug!("Skipping processing (ignored) \"{}\"", asset_path);
}
Err(ProcessError::ExtensionRequired) => {
}
Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
trace!("No loader found for {asset_path}");
}
Err(ProcessError::AssetReaderError {
err: AssetReaderError::NotFound(_),
..
}) => {
trace!("No need to process asset {asset_path} because it does not exist");
}
Err(err) => {
error!("Failed to process asset {asset_path}: {err}");
if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
err
{
let info = self.get_mut(&asset_path).expect("info should exist");
info.processed_info = Some(ProcessedInfo {
hash: AssetHash::default(),
full_hash: AssetHash::default(),
process_dependencies: vec![],
});
self.add_dependent(dependency.path(), asset_path.to_owned());
}
let info = self.get_mut(&asset_path).expect("info should exist");
info.update_status(ProcessStatus::Failed).await;
}
}
}
async fn remove(
&mut self,
asset_path: &AssetPath<'static>,
) -> Option<Arc<async_lock::RwLock<()>>> {
let info = self.infos.remove(asset_path)?;
if let Some(processed_info) = info.processed_info {
self.clear_dependencies(asset_path, processed_info);
}
info.status_sender
.broadcast(ProcessStatus::NonExistent)
.await
.unwrap();
if !info.dependents.is_empty() {
error!(
"The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
info.dependents
);
self.non_existent_dependents
.insert(asset_path.clone(), info.dependents);
}
Some(info.file_transaction_lock)
}
async fn rename(
&mut self,
old: &AssetPath<'static>,
new: &AssetPath<'static>,
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) -> Option<(Arc<async_lock::RwLock<()>>, Arc<async_lock::RwLock<()>>)> {
let mut info = self.infos.remove(old)?;
if !info.dependents.is_empty() {
error!(
"The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
info.dependents
);
self.non_existent_dependents
.insert(old.clone(), core::mem::take(&mut info.dependents));
}
if let Some(processed_info) = &info.processed_info {
for dep in &processed_info.process_dependencies {
if let Some(info) = self.infos.get_mut(&dep.path) {
info.dependents.remove(old);
info.dependents.insert(new.clone());
} else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) {
dependents.remove(old);
dependents.insert(new.clone());
}
}
}
info.status_sender
.broadcast(ProcessStatus::NonExistent)
.await
.unwrap();
let new_info = self.get_or_insert(new.clone());
new_info.processed_info = info.processed_info;
new_info.status = info.status;
if let Some(status) = new_info.status {
new_info.status_sender.broadcast(status).await.unwrap();
}
let dependents = new_info.dependents.iter().cloned().collect::<Vec<_>>();
let _ = new_task_sender
.send((new.source().clone_owned(), new.path().to_owned()))
.await;
for dependent in dependents {
let _ = new_task_sender
.send((
dependent.source().clone_owned(),
dependent.path().to_owned(),
))
.await;
}
Some((
info.file_transaction_lock,
new_info.file_transaction_lock.clone(),
))
}
fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
for old_load_dep in removed_info.process_dependencies {
if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
info.dependents.remove(asset_path);
} else if let Some(dependents) =
self.non_existent_dependents.get_mut(&old_load_dep.path)
{
dependents.remove(asset_path);
}
}
}
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum ProcessorState {
Initializing,
Processing,
Finished,
}
#[derive(Error, Debug)]
pub enum InitializeError {
#[error(transparent)]
FailedToReadSourcePaths(AssetReaderError),
#[error(transparent)]
FailedToReadDestinationPaths(AssetReaderError),
#[error("Failed to validate asset log: {0}")]
ValidateLogError(#[from] ValidateLogError),
}
#[derive(Error, Debug)]
pub enum SetTransactionLogFactoryError {
#[error("Transaction log is already in use so setting the factory does nothing")]
AlreadyInUse,
}
#[derive(Error, Debug, PartialEq, Eq)]
pub enum GetProcessorError {
#[error("The processor '{0}' does not exist")]
Missing(String),
#[error("The processor '{processor_short_name}' is ambiguous between several processors: {ambiguous_processor_names:?}")]
Ambiguous {
processor_short_name: String,
ambiguous_processor_names: Vec<&'static str>,
},
}
impl From<GetProcessorError> for ProcessError {
fn from(err: GetProcessorError) -> Self {
match err {
GetProcessorError::Missing(name) => Self::MissingProcessor(name),
GetProcessorError::Ambiguous {
processor_short_name,
ambiguous_processor_names,
} => Self::AmbiguousProcessor {
processor_short_name,
ambiguous_processor_names,
},
}
}
}
#[cfg(test)]
mod tests;