Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_asset/src/processor/log.rs
6604 views
1
use crate::AssetPath;
2
use alloc::{
3
format,
4
string::{String, ToString},
5
vec::Vec,
6
};
7
use async_fs::File;
8
use bevy_platform::collections::HashSet;
9
use futures_lite::{AsyncReadExt, AsyncWriteExt};
10
use std::path::PathBuf;
11
use thiserror::Error;
12
use tracing::error;
13
14
/// An in-memory representation of a single [`ProcessorTransactionLog`] entry.
15
#[derive(Debug)]
16
pub(crate) enum LogEntry {
17
BeginProcessing(AssetPath<'static>),
18
EndProcessing(AssetPath<'static>),
19
UnrecoverableError,
20
}
21
22
/// A "write ahead" logger that helps ensure asset importing is transactional.
23
///
24
/// Prior to processing an asset, we write to the log to indicate it has started
25
/// After processing an asset, we write to the log to indicate it has finished.
26
/// On startup, the log can be read to determine if any transactions were incomplete.
27
// TODO: this should be a trait
28
pub struct ProcessorTransactionLog {
29
log_file: File,
30
}
31
32
/// An error that occurs when reading from the [`ProcessorTransactionLog`] fails.
33
#[derive(Error, Debug)]
34
pub enum ReadLogError {
35
/// An invalid log line was encountered, consisting of the contained string.
36
#[error("Encountered an invalid log line: '{0}'")]
37
InvalidLine(String),
38
/// A file-system-based error occurred while reading the log file.
39
#[error("Failed to read log file: {0}")]
40
Io(#[from] futures_io::Error),
41
}
42
43
/// An error that occurs when writing to the [`ProcessorTransactionLog`] fails.
44
#[derive(Error, Debug)]
45
#[error(
46
"Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
47
)]
48
pub struct WriteLogError {
49
log_entry: LogEntry,
50
error: futures_io::Error,
51
}
52
53
/// An error that occurs when validating the [`ProcessorTransactionLog`] fails.
54
#[derive(Error, Debug)]
55
pub enum ValidateLogError {
56
/// An error that could not be recovered from. All assets will be reprocessed.
57
#[error("Encountered an unrecoverable error. All assets will be reprocessed.")]
58
UnrecoverableError,
59
/// A [`ReadLogError`].
60
#[error(transparent)]
61
ReadLogError(#[from] ReadLogError),
62
/// Duplicated process asset transactions occurred.
63
#[error("Encountered a duplicate process asset transaction: {0:?}")]
64
EntryErrors(Vec<LogEntryError>),
65
}
66
67
/// An error that occurs when validating individual [`ProcessorTransactionLog`] entries.
68
#[derive(Error, Debug)]
69
pub enum LogEntryError {
70
/// A duplicate process asset transaction occurred for the given asset path.
71
#[error("Encountered a duplicate process asset transaction: {0}")]
72
DuplicateTransaction(AssetPath<'static>),
73
/// A transaction was ended that never started for the given asset path.
74
#[error("A transaction was ended that never started {0}")]
75
EndedMissingTransaction(AssetPath<'static>),
76
/// An asset started processing but never finished at the given asset path.
77
#[error("An asset started processing but never finished: {0}")]
78
UnfinishedTransaction(AssetPath<'static>),
79
}
80
81
const LOG_PATH: &str = "imported_assets/log";
82
const ENTRY_BEGIN: &str = "Begin ";
83
const ENTRY_END: &str = "End ";
84
const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
85
86
impl ProcessorTransactionLog {
87
fn full_log_path() -> PathBuf {
88
#[cfg(not(target_arch = "wasm32"))]
89
let base_path = crate::io::file::get_base_path();
90
#[cfg(target_arch = "wasm32")]
91
let base_path = PathBuf::new();
92
base_path.join(LOG_PATH)
93
}
94
/// Create a new, fresh log file. This will delete the previous log file if it exists.
95
pub(crate) async fn new() -> Result<Self, futures_io::Error> {
96
let path = Self::full_log_path();
97
match async_fs::remove_file(&path).await {
98
Ok(_) => { /* successfully removed file */ }
99
Err(err) => {
100
// if the log file is not found, we assume we are starting in a fresh (or good) state
101
if err.kind() != futures_io::ErrorKind::NotFound {
102
error!("Failed to remove previous log file {}", err);
103
}
104
}
105
}
106
107
if let Some(parent_folder) = path.parent() {
108
async_fs::create_dir_all(parent_folder).await?;
109
}
110
111
Ok(Self {
112
log_file: File::create(path).await?,
113
})
114
}
115
116
pub(crate) async fn read() -> Result<Vec<LogEntry>, ReadLogError> {
117
let mut log_lines = Vec::new();
118
let mut file = match File::open(Self::full_log_path()).await {
119
Ok(file) => file,
120
Err(err) => {
121
if err.kind() == futures_io::ErrorKind::NotFound {
122
// if the log file doesn't exist, this is equivalent to an empty file
123
return Ok(log_lines);
124
}
125
return Err(err.into());
126
}
127
};
128
let mut string = String::new();
129
file.read_to_string(&mut string).await?;
130
for line in string.lines() {
131
if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
132
log_lines.push(LogEntry::BeginProcessing(
133
AssetPath::parse(path_str).into_owned(),
134
));
135
} else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
136
log_lines.push(LogEntry::EndProcessing(
137
AssetPath::parse(path_str).into_owned(),
138
));
139
} else if line.is_empty() {
140
continue;
141
} else {
142
return Err(ReadLogError::InvalidLine(line.to_string()));
143
}
144
}
145
Ok(log_lines)
146
}
147
148
pub(crate) async fn validate() -> Result<(), ValidateLogError> {
149
let mut transactions: HashSet<AssetPath<'static>> = Default::default();
150
let mut errors: Vec<LogEntryError> = Vec::new();
151
let entries = Self::read().await?;
152
for entry in entries {
153
match entry {
154
LogEntry::BeginProcessing(path) => {
155
// There should never be duplicate "start transactions" in a log
156
// Every start should be followed by:
157
// * nothing (if there was an abrupt stop)
158
// * an End (if the transaction was completed)
159
if !transactions.insert(path.clone()) {
160
errors.push(LogEntryError::DuplicateTransaction(path));
161
}
162
}
163
LogEntry::EndProcessing(path) => {
164
if !transactions.remove(&path) {
165
errors.push(LogEntryError::EndedMissingTransaction(path));
166
}
167
}
168
LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
169
}
170
}
171
for transaction in transactions {
172
errors.push(LogEntryError::UnfinishedTransaction(transaction));
173
}
174
if !errors.is_empty() {
175
return Err(ValidateLogError::EntryErrors(errors));
176
}
177
Ok(())
178
}
179
180
/// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`ProcessorTransactionLog::end_processing`],
181
/// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
182
pub(crate) async fn begin_processing(
183
&mut self,
184
path: &AssetPath<'_>,
185
) -> Result<(), WriteLogError> {
186
self.write(&format!("{ENTRY_BEGIN}{path}\n"))
187
.await
188
.map_err(|e| WriteLogError {
189
log_entry: LogEntry::BeginProcessing(path.clone_owned()),
190
error: e,
191
})
192
}
193
194
/// Logs the end of an asset being successfully processed. See [`ProcessorTransactionLog::begin_processing`].
195
pub(crate) async fn end_processing(
196
&mut self,
197
path: &AssetPath<'_>,
198
) -> Result<(), WriteLogError> {
199
self.write(&format!("{ENTRY_END}{path}\n"))
200
.await
201
.map_err(|e| WriteLogError {
202
log_entry: LogEntry::EndProcessing(path.clone_owned()),
203
error: e,
204
})
205
}
206
207
/// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
208
/// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
209
pub(crate) async fn unrecoverable(&mut self) -> Result<(), WriteLogError> {
210
self.write(UNRECOVERABLE_ERROR)
211
.await
212
.map_err(|e| WriteLogError {
213
log_entry: LogEntry::UnrecoverableError,
214
error: e,
215
})
216
}
217
218
async fn write(&mut self, line: &str) -> Result<(), futures_io::Error> {
219
self.log_file.write_all(line.as_bytes()).await?;
220
self.log_file.flush().await?;
221
Ok(())
222
}
223
}
224
225