Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_asset/src/processor/log.rs
9354 views
1
use crate::AssetPath;
2
use alloc::{
3
boxed::Box,
4
format,
5
string::{String, ToString},
6
vec::Vec,
7
};
8
use async_fs::File;
9
use bevy_ecs::error::BevyError;
10
use bevy_platform::collections::HashSet;
11
use bevy_tasks::BoxedFuture;
12
use futures_lite::{AsyncReadExt, AsyncWriteExt};
13
use std::path::PathBuf;
14
use thiserror::Error;
15
use tracing::error;
16
17
/// An in-memory representation of a single [`ProcessorTransactionLog`] entry.
18
#[derive(Debug)]
19
pub enum LogEntry {
20
BeginProcessing(AssetPath<'static>),
21
EndProcessing(AssetPath<'static>),
22
UnrecoverableError,
23
}
24
25
/// A factory of [`ProcessorTransactionLog`] that handles the state before the log has been started.
26
///
27
/// This trait also assists in recovering from partial processing by fetching the previous state of
28
/// the transaction log.
29
pub trait ProcessorTransactionLogFactory: Send + Sync + 'static {
30
/// Reads all entries in a previous transaction log if present.
31
///
32
/// If there is no previous transaction log, this method should return an empty Vec of entries.
33
fn read(&self) -> BoxedFuture<'_, Result<Vec<LogEntry>, BevyError>>;
34
35
/// Creates a new transaction log to write to.
36
///
37
/// This should remove any previous entries if they exist.
38
fn create_new_log(
39
&self,
40
) -> BoxedFuture<'_, Result<Box<dyn ProcessorTransactionLog>, BevyError>>;
41
}
42
43
/// A "write ahead" logger that helps ensure asset importing is transactional.
44
///
45
/// Prior to processing an asset, we write to the log to indicate it has started. After processing
46
/// an asset, we write to the log to indicate it has finished. On startup, the log can be read
47
/// through [`ProcessorTransactionLogFactory`] to determine if any transactions were incomplete.
48
pub trait ProcessorTransactionLog: Send + Sync + 'static {
49
/// Logs the start of an asset being processed.
50
///
51
/// If this is not followed at some point in the log by a closing
52
/// [`ProcessorTransactionLog::end_processing`], in the next run of the processor the asset
53
/// processing will be considered "incomplete" and it will be reprocessed.
54
fn begin_processing<'a>(
55
&'a mut self,
56
asset: &'a AssetPath<'_>,
57
) -> BoxedFuture<'a, Result<(), BevyError>>;
58
59
/// Logs the end of an asset being successfully processed. See
60
/// [`ProcessorTransactionLog::begin_processing`].
61
fn end_processing<'a>(
62
&'a mut self,
63
asset: &'a AssetPath<'_>,
64
) -> BoxedFuture<'a, Result<(), BevyError>>;
65
66
/// Logs an unrecoverable error.
67
///
68
/// On the next run of the processor, all assets will be regenerated. This should only be used
69
/// as a last resort. Every call to this should be considered with scrutiny and ideally replaced
70
/// with something more granular.
71
fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>>;
72
}
73
74
/// Validate the previous state of the transaction log and determine any assets that need to be
75
/// reprocessed.
76
pub(crate) async fn validate_transaction_log(
77
log_factory: &dyn ProcessorTransactionLogFactory,
78
) -> Result<(), ValidateLogError> {
79
let mut transactions: HashSet<AssetPath<'static>> = Default::default();
80
let mut errors: Vec<LogEntryError> = Vec::new();
81
let entries = log_factory
82
.read()
83
.await
84
.map_err(ValidateLogError::ReadLogError)?;
85
for entry in entries {
86
match entry {
87
LogEntry::BeginProcessing(path) => {
88
// There should never be duplicate "start transactions" in a log
89
// Every start should be followed by:
90
// * nothing (if there was an abrupt stop)
91
// * an End (if the transaction was completed)
92
if !transactions.insert(path.clone()) {
93
errors.push(LogEntryError::DuplicateTransaction(path));
94
}
95
}
96
LogEntry::EndProcessing(path) => {
97
if !transactions.remove(&path) {
98
errors.push(LogEntryError::EndedMissingTransaction(path));
99
}
100
}
101
LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
102
}
103
}
104
for transaction in transactions {
105
errors.push(LogEntryError::UnfinishedTransaction(transaction));
106
}
107
if !errors.is_empty() {
108
return Err(ValidateLogError::EntryErrors(errors));
109
}
110
Ok(())
111
}
112
113
/// A transaction log factory that uses a file as its storage.
114
pub struct FileTransactionLogFactory {
115
/// The file path that the transaction log should write to.
116
pub file_path: PathBuf,
117
}
118
119
const LOG_PATH: &str = "imported_assets/log";
120
121
impl Default for FileTransactionLogFactory {
122
fn default() -> Self {
123
#[cfg(not(target_arch = "wasm32"))]
124
let base_path = crate::io::file::get_base_path();
125
#[cfg(target_arch = "wasm32")]
126
let base_path = PathBuf::new();
127
let file_path = base_path.join(LOG_PATH);
128
Self { file_path }
129
}
130
}
131
132
impl ProcessorTransactionLogFactory for FileTransactionLogFactory {
133
fn read(&self) -> BoxedFuture<'_, Result<Vec<LogEntry>, BevyError>> {
134
let path = self.file_path.clone();
135
Box::pin(async move {
136
let mut log_lines = Vec::new();
137
let mut file = match File::open(path).await {
138
Ok(file) => file,
139
Err(err) => {
140
if err.kind() == futures_io::ErrorKind::NotFound {
141
// if the log file doesn't exist, this is equivalent to an empty file
142
return Ok(log_lines);
143
}
144
return Err(err.into());
145
}
146
};
147
let mut string = String::new();
148
file.read_to_string(&mut string).await?;
149
for line in string.lines() {
150
if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
151
log_lines.push(LogEntry::BeginProcessing(
152
AssetPath::parse(path_str).into_owned(),
153
));
154
} else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
155
log_lines.push(LogEntry::EndProcessing(
156
AssetPath::parse(path_str).into_owned(),
157
));
158
} else if line.is_empty() {
159
continue;
160
} else {
161
return Err(ReadLogError::InvalidLine(line.to_string()).into());
162
}
163
}
164
Ok(log_lines)
165
})
166
}
167
168
fn create_new_log(
169
&self,
170
) -> BoxedFuture<'_, Result<Box<dyn ProcessorTransactionLog>, BevyError>> {
171
let path = self.file_path.clone();
172
Box::pin(async move {
173
match async_fs::remove_file(&path).await {
174
Ok(_) => { /* successfully removed file */ }
175
Err(err) => {
176
// if the log file is not found, we assume we are starting in a fresh (or good) state
177
if err.kind() != futures_io::ErrorKind::NotFound {
178
error!("Failed to remove previous log file {}", err);
179
}
180
}
181
}
182
183
if let Some(parent_folder) = path.parent() {
184
async_fs::create_dir_all(parent_folder).await?;
185
}
186
187
Ok(Box::new(FileProcessorTransactionLog {
188
log_file: File::create(path).await?,
189
}) as _)
190
})
191
}
192
}
193
194
/// A "write ahead" logger that helps ensure asset importing is transactional.
195
///
196
/// Prior to processing an asset, we write to the log to indicate it has started
197
/// After processing an asset, we write to the log to indicate it has finished.
198
/// On startup, the log can be read to determine if any transactions were incomplete.
199
struct FileProcessorTransactionLog {
200
/// The file to write logs to.
201
log_file: File,
202
}
203
204
impl FileProcessorTransactionLog {
205
/// Write `line` to the file and flush it.
206
async fn write(&mut self, line: &str) -> Result<(), BevyError> {
207
self.log_file.write_all(line.as_bytes()).await?;
208
self.log_file.flush().await?;
209
Ok(())
210
}
211
}
212
213
const ENTRY_BEGIN: &str = "Begin ";
214
const ENTRY_END: &str = "End ";
215
const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
216
217
impl ProcessorTransactionLog for FileProcessorTransactionLog {
218
fn begin_processing<'a>(
219
&'a mut self,
220
asset: &'a AssetPath<'_>,
221
) -> BoxedFuture<'a, Result<(), BevyError>> {
222
Box::pin(async move { self.write(&format!("{ENTRY_BEGIN}{asset}\n")).await })
223
}
224
225
fn end_processing<'a>(
226
&'a mut self,
227
asset: &'a AssetPath<'_>,
228
) -> BoxedFuture<'a, Result<(), BevyError>> {
229
Box::pin(async move { self.write(&format!("{ENTRY_END}{asset}\n")).await })
230
}
231
232
fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>> {
233
Box::pin(async move { self.write(UNRECOVERABLE_ERROR).await })
234
}
235
}
236
237
/// An error that occurs when reading from the [`ProcessorTransactionLog`] fails.
238
#[derive(Error, Debug)]
239
pub enum ReadLogError {
240
/// An invalid log line was encountered, consisting of the contained string.
241
#[error("Encountered an invalid log line: '{0}'")]
242
InvalidLine(String),
243
/// A file-system-based error occurred while reading the log file.
244
#[error("Failed to read log file: {0}")]
245
Io(#[from] futures_io::Error),
246
}
247
248
/// An error that occurs when writing to the [`ProcessorTransactionLog`] fails.
249
#[derive(Error, Debug)]
250
#[error(
251
"Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
252
)]
253
pub(crate) struct WriteLogError {
254
pub(crate) log_entry: LogEntry,
255
pub(crate) error: BevyError,
256
}
257
258
/// An error that occurs when validating the [`ProcessorTransactionLog`] fails.
259
#[derive(Error, Debug)]
260
pub enum ValidateLogError {
261
/// An error that could not be recovered from. All assets will be reprocessed.
262
#[error("Encountered an unrecoverable error. All assets will be reprocessed.")]
263
UnrecoverableError,
264
/// A [`ReadLogError`].
265
#[error("Failed to read log entries: {0}")]
266
ReadLogError(BevyError),
267
/// Duplicated process asset transactions occurred.
268
#[error("Encountered a duplicate process asset transaction: {0:?}")]
269
EntryErrors(Vec<LogEntryError>),
270
}
271
272
/// An error that occurs when validating individual [`ProcessorTransactionLog`] entries.
273
#[derive(Error, Debug)]
274
pub enum LogEntryError {
275
/// A duplicate process asset transaction occurred for the given asset path.
276
#[error("Encountered a duplicate process asset transaction: {0}")]
277
DuplicateTransaction(AssetPath<'static>),
278
/// A transaction was ended that never started for the given asset path.
279
#[error("A transaction was ended that never started {0}")]
280
EndedMissingTransaction(AssetPath<'static>),
281
/// An asset started processing but never finished at the given asset path.
282
#[error("An asset started processing but never finished: {0}")]
283
UnfinishedTransaction(AssetPath<'static>),
284
}
285
286