Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/devices/src/virtio/video/utils.rs
5394 views
1
// Copyright 2022 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
// Not all video backends make use of the tools in this module.
6
#![allow(dead_code)]
7
8
use std::collections::btree_map::Entry;
9
use std::collections::BTreeMap;
10
use std::collections::VecDeque;
11
use std::time::Duration;
12
13
use base::AsRawDescriptor;
14
use base::Event;
15
use base::EventExt;
16
use sync::Mutex;
17
use thiserror::Error as ThisError;
18
19
use crate::virtio::video::resource::GuestResource;
20
21
/// Manages a pollable queue of events to be sent to the decoder or encoder.
22
pub struct EventQueue<T> {
23
/// Pipe used to signal available events.
24
event: Event,
25
/// FIFO of all pending events.
26
pending_events: VecDeque<T>,
27
}
28
29
impl<T> EventQueue<T> {
30
/// Create a new event queue.
31
pub fn new() -> base::Result<Self> {
32
Ok(Self {
33
// Use semaphore semantics so `eventfd` can be `read` as many times as it has been
34
// `write`n to without blocking.
35
event: Event::new()?,
36
pending_events: Default::default(),
37
})
38
}
39
40
/// Add `event` to the queue.
41
pub fn queue_event(&mut self, event: T) -> base::Result<()> {
42
self.pending_events.push_back(event);
43
self.event.write_count(1)?;
44
Ok(())
45
}
46
47
/// Read the next event, blocking until an event becomes available.
48
pub fn dequeue_event(&mut self) -> base::Result<T> {
49
// Wait until at least one event is written, if necessary.
50
let cpt = self.event.read_count()?;
51
let event = match self.pending_events.pop_front() {
52
Some(event) => event,
53
None => panic!("event signaled but no pending event - this is a bug."),
54
};
55
// If we have more than one event pending, write the remainder back into the event so it
56
// keeps signalling.
57
if cpt > 1 {
58
self.event.write_count(cpt - 1)?;
59
}
60
61
Ok(event)
62
}
63
64
/// Remove all the posted events for which `predicate` returns `false`.
65
pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) {
66
if !self.pending_events.is_empty() {
67
let _ = self
68
.event
69
.wait_timeout(Duration::from_millis(0))
70
.expect("wait_timeout failure");
71
}
72
73
self.pending_events.retain(predicate);
74
75
let num_pending_events = self.pending_events.len();
76
if num_pending_events > 0 {
77
self.event
78
.write_count(num_pending_events as u64)
79
.expect("write failure");
80
}
81
}
82
83
/// Returns the number of events currently pending on this queue, i.e. the number of times
84
/// `dequeue_event` can be called without blocking.
85
#[cfg(test)]
86
pub fn len(&self) -> usize {
87
self.pending_events.len()
88
}
89
}
90
91
impl<T> AsRawDescriptor for EventQueue<T> {
92
fn as_raw_descriptor(&self) -> base::RawDescriptor {
93
self.event.as_raw_descriptor()
94
}
95
}
96
97
/// An `EventQueue` that is `Sync`, `Send`, and non-mut - i.e. that can easily be passed across
98
/// threads and wrapped into a `Rc` or `Arc`.
99
pub struct SyncEventQueue<T>(Mutex<EventQueue<T>>);
100
101
impl<T> From<EventQueue<T>> for SyncEventQueue<T> {
102
fn from(queue: EventQueue<T>) -> Self {
103
Self(Mutex::new(queue))
104
}
105
}
106
107
impl<T> SyncEventQueue<T> {
108
/// Add `event` to the queue.
109
pub fn queue_event(&self, event: T) -> base::Result<()> {
110
self.0.lock().queue_event(event)
111
}
112
113
/// Read the next event, blocking until an event becomes available.
114
pub fn dequeue_event(&self) -> base::Result<T> {
115
self.0.lock().dequeue_event()
116
}
117
118
/// Remove all the posted events for which `predicate` returns `false`.
119
pub fn retain<P: FnMut(&T) -> bool>(&self, predicate: P) {
120
self.0.lock().retain(predicate)
121
}
122
123
/// Returns the number of events currently pending on this queue, i.e. the number of times
124
/// `dequeue_event` can be called without blocking.
125
#[cfg(test)]
126
pub fn len(&self) -> usize {
127
self.0.lock().len()
128
}
129
}
130
131
impl<T> AsRawDescriptor for SyncEventQueue<T> {
132
fn as_raw_descriptor(&self) -> base::RawDescriptor {
133
self.0.lock().as_raw_descriptor()
134
}
135
}
136
137
/// Queue of all the output buffers provided by crosvm.
138
pub struct OutputQueue {
139
// Max number of output buffers that can be imported into this queue.
140
num_buffers: usize,
141
// Maps picture IDs to the corresponding guest resource.
142
buffers: BTreeMap<u32, GuestResource>,
143
// Picture IDs of output buffers we can write into.
144
ready_buffers: VecDeque<u32>,
145
}
146
147
#[derive(Debug, ThisError)]
148
pub enum OutputBufferImportError {
149
#[error("maximum number of imported buffers ({0}) already reached")]
150
MaxBuffersReached(usize),
151
#[error("a buffer with picture ID {0} is already imported")]
152
AlreadyImported(u32),
153
}
154
155
#[derive(Debug, ThisError)]
156
pub enum OutputBufferReuseError {
157
#[error("no buffer with picture ID {0} is imported at the moment")]
158
NotYetImported(u32),
159
#[error("buffer with picture ID {0} is already ready for use")]
160
AlreadyUsed(u32),
161
}
162
163
impl OutputQueue {
164
/// Creates a new output queue capable of containing `num_buffers` buffers.
165
pub fn new(num_buffers: usize) -> Self {
166
Self {
167
num_buffers,
168
buffers: Default::default(),
169
ready_buffers: Default::default(),
170
}
171
}
172
173
/// Import a buffer, i.e. associate the buffer's `resource` to a given `picture_buffer_id`, and
174
/// make the buffer ready for use.
175
///
176
/// A buffer with a given `picture_buffer_id` can only be imported once.
177
pub fn import_buffer(
178
&mut self,
179
picture_buffer_id: u32,
180
resource: GuestResource,
181
) -> Result<(), OutputBufferImportError> {
182
if self.buffers.len() >= self.num_buffers {
183
return Err(OutputBufferImportError::MaxBuffersReached(self.num_buffers));
184
}
185
186
match self.buffers.entry(picture_buffer_id) {
187
Entry::Vacant(o) => {
188
o.insert(resource);
189
}
190
Entry::Occupied(_) => {
191
return Err(OutputBufferImportError::AlreadyImported(picture_buffer_id));
192
}
193
}
194
195
self.ready_buffers.push_back(picture_buffer_id);
196
197
Ok(())
198
}
199
200
/// Mark the previously-imported buffer with ID `picture_buffer_id` as ready for being used.
201
pub fn reuse_buffer(&mut self, picture_buffer_id: u32) -> Result<(), OutputBufferReuseError> {
202
if !self.buffers.contains_key(&picture_buffer_id) {
203
return Err(OutputBufferReuseError::NotYetImported(picture_buffer_id));
204
}
205
206
if self.ready_buffers.contains(&picture_buffer_id) {
207
return Err(OutputBufferReuseError::AlreadyUsed(picture_buffer_id));
208
}
209
210
self.ready_buffers.push_back(picture_buffer_id);
211
212
Ok(())
213
}
214
215
/// Get a buffer ready to be decoded into, if any is available.
216
pub fn try_get_ready_buffer(&mut self) -> Option<(u32, &mut GuestResource)> {
217
let picture_buffer_id = self.ready_buffers.pop_front()?;
218
// Unwrapping is safe here because our interface guarantees that ids in `ready_buffers` are
219
// valid keys for `buffers`.
220
Some((
221
picture_buffer_id,
222
self.buffers
223
.get_mut(&picture_buffer_id)
224
.expect("expected buffer not present in queue"),
225
))
226
}
227
228
pub fn clear_ready_buffers(&mut self) {
229
self.ready_buffers.clear();
230
}
231
}
232
233
#[cfg(test)]
234
mod tests {
235
use std::time::Duration;
236
237
use base::EventToken;
238
use base::WaitContext;
239
240
use super::*;
241
use crate::virtio::video::error::VideoError;
242
use crate::virtio::video::error::VideoResult;
243
use crate::virtio::video::format::Rect;
244
245
/// This is the same as DecoderEvent but copied here so that the test can be compiled
246
/// without depending on the "video-decoder" feature.
247
#[derive(Debug)]
248
pub enum TestEvent {
249
#[allow(dead_code)]
250
ProvidePictureBuffers {
251
min_num_buffers: u32,
252
width: i32,
253
height: i32,
254
visible_rect: Rect,
255
},
256
PictureReady {
257
picture_buffer_id: i32,
258
timestamp: u64,
259
visible_rect: Rect,
260
},
261
NotifyEndOfBitstreamBuffer(u32),
262
#[allow(dead_code)]
263
NotifyError(VideoError),
264
#[allow(dead_code)]
265
FlushCompleted(VideoResult<()>),
266
#[allow(dead_code)]
267
ResetCompleted(VideoResult<()>),
268
}
269
270
/// Test basic queue/dequeue functionality of `EventQueue`.
271
#[test]
272
fn event_queue() {
273
let mut event_queue = EventQueue::new().unwrap();
274
275
assert_eq!(
276
event_queue.queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1)),
277
Ok(())
278
);
279
assert_eq!(event_queue.len(), 1);
280
assert_eq!(
281
event_queue.queue_event(TestEvent::PictureReady {
282
picture_buffer_id: 0,
283
timestamp: 42,
284
visible_rect: Rect {
285
left: 0,
286
top: 0,
287
right: 320,
288
bottom: 240,
289
},
290
}),
291
Ok(())
292
);
293
assert_eq!(event_queue.len(), 2);
294
295
assert!(matches!(
296
event_queue.dequeue_event(),
297
Ok(TestEvent::NotifyEndOfBitstreamBuffer(1))
298
));
299
assert_eq!(event_queue.len(), 1);
300
assert!(matches!(
301
event_queue.dequeue_event(),
302
Ok(TestEvent::PictureReady {
303
picture_buffer_id: 0,
304
timestamp: 42,
305
visible_rect: Rect {
306
left: 0,
307
top: 0,
308
right: 320,
309
bottom: 240,
310
}
311
})
312
));
313
assert_eq!(event_queue.len(), 0);
314
}
315
316
/// Test polling of `TestEventQueue`'s `event_pipe`.
317
#[test]
318
fn decoder_event_queue_polling() {
319
#[derive(EventToken)]
320
enum Token {
321
Event,
322
}
323
324
let mut event_queue = EventQueue::new().unwrap();
325
let wait_context = WaitContext::build_with(&[(&event_queue, Token::Event)]).unwrap();
326
327
// The queue is empty, so `event_pipe` should not signal.
328
assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
329
330
// `event_pipe` should signal as long as the queue is not empty.
331
event_queue
332
.queue_event(TestEvent::NotifyEndOfBitstreamBuffer(1))
333
.unwrap();
334
assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
335
event_queue
336
.queue_event(TestEvent::NotifyEndOfBitstreamBuffer(2))
337
.unwrap();
338
assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
339
event_queue
340
.queue_event(TestEvent::NotifyEndOfBitstreamBuffer(3))
341
.unwrap();
342
assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
343
344
event_queue.dequeue_event().unwrap();
345
assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
346
event_queue.dequeue_event().unwrap();
347
assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 1);
348
event_queue.dequeue_event().unwrap();
349
350
// The queue is empty again, so `event_pipe` should not signal.
351
assert_eq!(wait_context.wait_timeout(Duration::ZERO).unwrap().len(), 0);
352
}
353
}
354
355