use crate::AssetPath;
use alloc::{
boxed::Box,
format,
string::{String, ToString},
vec::Vec,
};
use async_fs::File;
use bevy_ecs::error::BevyError;
use bevy_platform::collections::HashSet;
use bevy_tasks::BoxedFuture;
use futures_lite::{AsyncReadExt, AsyncWriteExt};
use std::path::PathBuf;
use thiserror::Error;
use tracing::error;
#[derive(Debug)]
pub enum LogEntry {
BeginProcessing(AssetPath<'static>),
EndProcessing(AssetPath<'static>),
UnrecoverableError,
}
pub trait ProcessorTransactionLogFactory: Send + Sync + 'static {
fn read(&self) -> BoxedFuture<'_, Result<Vec<LogEntry>, BevyError>>;
fn create_new_log(
&self,
) -> BoxedFuture<'_, Result<Box<dyn ProcessorTransactionLog>, BevyError>>;
}
pub trait ProcessorTransactionLog: Send + Sync + 'static {
fn begin_processing<'a>(
&'a mut self,
asset: &'a AssetPath<'_>,
) -> BoxedFuture<'a, Result<(), BevyError>>;
fn end_processing<'a>(
&'a mut self,
asset: &'a AssetPath<'_>,
) -> BoxedFuture<'a, Result<(), BevyError>>;
fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>>;
}
pub(crate) async fn validate_transaction_log(
log_factory: &dyn ProcessorTransactionLogFactory,
) -> Result<(), ValidateLogError> {
let mut transactions: HashSet<AssetPath<'static>> = Default::default();
let mut errors: Vec<LogEntryError> = Vec::new();
let entries = log_factory
.read()
.await
.map_err(ValidateLogError::ReadLogError)?;
for entry in entries {
match entry {
LogEntry::BeginProcessing(path) => {
if !transactions.insert(path.clone()) {
errors.push(LogEntryError::DuplicateTransaction(path));
}
}
LogEntry::EndProcessing(path) => {
if !transactions.remove(&path) {
errors.push(LogEntryError::EndedMissingTransaction(path));
}
}
LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
}
}
for transaction in transactions {
errors.push(LogEntryError::UnfinishedTransaction(transaction));
}
if !errors.is_empty() {
return Err(ValidateLogError::EntryErrors(errors));
}
Ok(())
}
pub struct FileTransactionLogFactory {
pub file_path: PathBuf,
}
const LOG_PATH: &str = "imported_assets/log";
impl Default for FileTransactionLogFactory {
fn default() -> Self {
#[cfg(not(target_arch = "wasm32"))]
let base_path = crate::io::file::get_base_path();
#[cfg(target_arch = "wasm32")]
let base_path = PathBuf::new();
let file_path = base_path.join(LOG_PATH);
Self { file_path }
}
}
impl ProcessorTransactionLogFactory for FileTransactionLogFactory {
fn read(&self) -> BoxedFuture<'_, Result<Vec<LogEntry>, BevyError>> {
let path = self.file_path.clone();
Box::pin(async move {
let mut log_lines = Vec::new();
let mut file = match File::open(path).await {
Ok(file) => file,
Err(err) => {
if err.kind() == futures_io::ErrorKind::NotFound {
return Ok(log_lines);
}
return Err(err.into());
}
};
let mut string = String::new();
file.read_to_string(&mut string).await?;
for line in string.lines() {
if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
log_lines.push(LogEntry::BeginProcessing(
AssetPath::parse(path_str).into_owned(),
));
} else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
log_lines.push(LogEntry::EndProcessing(
AssetPath::parse(path_str).into_owned(),
));
} else if line.is_empty() {
continue;
} else {
return Err(ReadLogError::InvalidLine(line.to_string()).into());
}
}
Ok(log_lines)
})
}
fn create_new_log(
&self,
) -> BoxedFuture<'_, Result<Box<dyn ProcessorTransactionLog>, BevyError>> {
let path = self.file_path.clone();
Box::pin(async move {
match async_fs::remove_file(&path).await {
Ok(_) => { }
Err(err) => {
if err.kind() != futures_io::ErrorKind::NotFound {
error!("Failed to remove previous log file {}", err);
}
}
}
if let Some(parent_folder) = path.parent() {
async_fs::create_dir_all(parent_folder).await?;
}
Ok(Box::new(FileProcessorTransactionLog {
log_file: File::create(path).await?,
}) as _)
})
}
}
struct FileProcessorTransactionLog {
log_file: File,
}
impl FileProcessorTransactionLog {
async fn write(&mut self, line: &str) -> Result<(), BevyError> {
self.log_file.write_all(line.as_bytes()).await?;
self.log_file.flush().await?;
Ok(())
}
}
const ENTRY_BEGIN: &str = "Begin ";
const ENTRY_END: &str = "End ";
const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
impl ProcessorTransactionLog for FileProcessorTransactionLog {
fn begin_processing<'a>(
&'a mut self,
asset: &'a AssetPath<'_>,
) -> BoxedFuture<'a, Result<(), BevyError>> {
Box::pin(async move { self.write(&format!("{ENTRY_BEGIN}{asset}\n")).await })
}
fn end_processing<'a>(
&'a mut self,
asset: &'a AssetPath<'_>,
) -> BoxedFuture<'a, Result<(), BevyError>> {
Box::pin(async move { self.write(&format!("{ENTRY_END}{asset}\n")).await })
}
fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>> {
Box::pin(async move { self.write(UNRECOVERABLE_ERROR).await })
}
}
#[derive(Error, Debug)]
pub enum ReadLogError {
#[error("Encountered an invalid log line: '{0}'")]
InvalidLine(String),
#[error("Failed to read log file: {0}")]
Io(#[from] futures_io::Error),
}
#[derive(Error, Debug)]
#[error(
"Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
)]
pub(crate) struct WriteLogError {
pub(crate) log_entry: LogEntry,
pub(crate) error: BevyError,
}
#[derive(Error, Debug)]
pub enum ValidateLogError {
#[error("Encountered an unrecoverable error. All assets will be reprocessed.")]
UnrecoverableError,
#[error("Failed to read log entries: {0}")]
ReadLogError(BevyError),
#[error("Encountered a duplicate process asset transaction: {0:?}")]
EntryErrors(Vec<LogEntryError>),
}
#[derive(Error, Debug)]
pub enum LogEntryError {
#[error("Encountered a duplicate process asset transaction: {0}")]
DuplicateTransaction(AssetPath<'static>),
#[error("A transaction was ended that never started {0}")]
EndedMissingTransaction(AssetPath<'static>),
#[error("An asset started processing but never finished: {0}")]
UnfinishedTransaction(AssetPath<'static>),
}