Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/devices/src/virtio/video/worker.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
//! Worker that runs in a virtio-video thread.
6
7
use std::collections::VecDeque;
8
use std::time::Duration;
9
10
use base::clone_descriptor;
11
use base::error;
12
use base::info;
13
use base::Event;
14
use base::WaitContext;
15
use cros_async::select3;
16
use cros_async::AsyncWrapper;
17
use cros_async::EventAsync;
18
use cros_async::Executor;
19
use cros_async::SelectResult;
20
use futures::FutureExt;
21
22
use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap;
23
use crate::virtio::video::command::QueueType;
24
use crate::virtio::video::command::VideoCmd;
25
use crate::virtio::video::device::AsyncCmdResponse;
26
use crate::virtio::video::device::AsyncCmdTag;
27
use crate::virtio::video::device::Device;
28
use crate::virtio::video::device::Token;
29
use crate::virtio::video::device::VideoCmdResponseType;
30
use crate::virtio::video::device::VideoEvtResponseType;
31
use crate::virtio::video::event;
32
use crate::virtio::video::event::EvtType;
33
use crate::virtio::video::event::VideoEvt;
34
use crate::virtio::video::response;
35
use crate::virtio::video::response::Response;
36
use crate::virtio::video::Error;
37
use crate::virtio::video::Result;
38
use crate::virtio::DescriptorChain;
39
use crate::virtio::Queue;
40
41
/// Worker that takes care of running the virtio video device.
42
pub struct Worker {
43
/// VirtIO queue for Command queue
44
cmd_queue: Queue,
45
/// VirtIO queue for Event queue
46
event_queue: Queue,
47
/// Stores descriptor chains in which responses for asynchronous commands will be written
48
desc_map: AsyncCmdDescMap,
49
}
50
51
/// Pair of a descriptor chain and a response to be written.
52
type WritableResp = (DescriptorChain, response::CmdResponse);
53
54
impl Worker {
55
pub fn new(cmd_queue: Queue, event_queue: Queue) -> Self {
56
Self {
57
cmd_queue,
58
event_queue,
59
desc_map: Default::default(),
60
}
61
}
62
63
/// Writes responses into the command queue.
64
fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> {
65
if responses.is_empty() {
66
return Ok(());
67
}
68
while let Some((mut desc, response)) = responses.pop_front() {
69
if let Err(e) = response.write(&mut desc.writer) {
70
error!(
71
"failed to write a command response for {:?}: {}",
72
response, e
73
);
74
}
75
self.cmd_queue.add_used(desc);
76
}
77
self.cmd_queue.trigger_interrupt();
78
Ok(())
79
}
80
81
/// Writes a `VideoEvt` into the event queue.
82
fn write_event(&mut self, event: event::VideoEvt) -> Result<()> {
83
let mut desc = self
84
.event_queue
85
.pop()
86
.ok_or(Error::DescriptorNotAvailable)?;
87
88
event
89
.write(&mut desc.writer)
90
.map_err(|error| Error::WriteEventFailure { event, error })?;
91
self.event_queue.add_used(desc);
92
self.event_queue.trigger_interrupt();
93
Ok(())
94
}
95
96
/// Writes the `event_responses` into the command queue or the event queue according to
97
/// each response's type.
98
///
99
/// # Arguments
100
///
101
/// * `event_responses` - Responses to write
102
/// * `stream_id` - Stream session ID of the responses
103
fn write_event_responses(
104
&mut self,
105
event_responses: Vec<VideoEvtResponseType>,
106
stream_id: u32,
107
) -> Result<()> {
108
let mut responses: VecDeque<WritableResp> = Default::default();
109
for event_response in event_responses {
110
match event_response {
111
VideoEvtResponseType::AsyncCmd(async_response) => {
112
let AsyncCmdResponse {
113
tag,
114
response: cmd_result,
115
} = async_response;
116
match self.desc_map.remove(&tag) {
117
Some(desc) => {
118
let cmd_response = match cmd_result {
119
Ok(r) => r,
120
Err(e) => {
121
error!("returning async error response: {}", &e);
122
e.into()
123
}
124
};
125
responses.push_back((desc, cmd_response))
126
}
127
None => match tag {
128
// TODO(b/153406792): Drain is cancelled by clearing either of the
129
// stream's queues. To work around a limitation in the VDA api, the
130
// output queue is cleared synchronously without going through VDA.
131
// Because of this, the cancellation response from VDA for the
132
// input queue might fail to find the drain's AsyncCmdTag.
133
AsyncCmdTag::Drain { stream_id: _ } => {
134
info!("ignoring unknown drain response");
135
}
136
_ => {
137
error!("dropping response for an untracked command: {:?}", tag);
138
}
139
},
140
}
141
}
142
VideoEvtResponseType::Event(evt) => {
143
self.write_event(evt)?;
144
}
145
}
146
}
147
148
if let Err(e) = self.write_responses(&mut responses) {
149
error!("Failed to write event responses: {:?}", e);
150
// Ignore result of write_event for a fatal error.
151
let _ = self.write_event(VideoEvt {
152
typ: EvtType::Error,
153
stream_id,
154
});
155
return Err(e);
156
}
157
158
Ok(())
159
}
160
161
/// Handles a `DescriptorChain` value sent via the command queue and returns a `VecDeque`
162
/// of `WritableResp` to be sent to the guest.
163
///
164
/// # Arguments
165
///
166
/// * `device` - Instance of backend device
167
/// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to
168
/// `wait_ctx`
169
/// * `desc` - `DescriptorChain` to handle
170
fn handle_command_desc(
171
&mut self,
172
device: &mut dyn Device,
173
wait_ctx: &WaitContext<Token>,
174
mut desc: DescriptorChain,
175
) -> Result<VecDeque<WritableResp>> {
176
let mut responses: VecDeque<WritableResp> = Default::default();
177
let cmd = VideoCmd::from_reader(&mut desc.reader).map_err(Error::ReadFailure)?;
178
179
// If a destruction command comes, cancel pending requests.
180
// TODO(b/161774071): Allow `process_cmd` to return multiple responses and move this
181
// into encoder/decoder.
182
let async_responses = match cmd {
183
VideoCmd::ResourceDestroyAll {
184
stream_id,
185
queue_type,
186
} => self
187
.desc_map
188
.create_cancellation_responses(&stream_id, Some(queue_type), None),
189
VideoCmd::StreamDestroy { stream_id } => self
190
.desc_map
191
.create_cancellation_responses(&stream_id, None, None),
192
VideoCmd::QueueClear {
193
stream_id,
194
queue_type: QueueType::Output,
195
} => {
196
// TODO(b/153406792): Due to a workaround for a limitation in the VDA api,
197
// clearing the output queue doesn't go through the same Async path as clearing
198
// the input queue. However, we still need to cancel the pending resources.
199
self.desc_map.create_cancellation_responses(
200
&stream_id,
201
Some(QueueType::Output),
202
None,
203
)
204
}
205
_ => Default::default(),
206
};
207
for async_response in async_responses {
208
let AsyncCmdResponse {
209
tag,
210
response: cmd_result,
211
} = async_response;
212
let destroy_response = match cmd_result {
213
Ok(r) => r,
214
Err(e) => {
215
error!("returning async error response: {}", &e);
216
e.into()
217
}
218
};
219
match self.desc_map.remove(&tag) {
220
Some(destroy_desc) => {
221
responses.push_back((destroy_desc, destroy_response));
222
}
223
None => error!("dropping response for an untracked command: {:?}", tag),
224
}
225
}
226
227
// Process the command by the device.
228
let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx);
229
match cmd_response {
230
VideoCmdResponseType::Sync(r) => {
231
responses.push_back((desc, r));
232
}
233
VideoCmdResponseType::Async(tag) => {
234
// If the command expects an asynchronous response,
235
// store `desc` to use it after the back-end device notifies the
236
// completion.
237
self.desc_map.insert(tag, desc);
238
}
239
}
240
if let Some((stream_id, event_responses)) = event_responses_with_id {
241
self.write_event_responses(event_responses, stream_id)?;
242
}
243
244
Ok(responses)
245
}
246
247
/// Handles each command in the command queue.
248
///
249
/// # Arguments
250
///
251
/// * `device` - Instance of backend device
252
/// * `wait_ctx` - `device` may register a new `Token::Event` for a new stream session to
253
/// `wait_ctx`
254
fn handle_command_queue(
255
&mut self,
256
device: &mut dyn Device,
257
wait_ctx: &WaitContext<Token>,
258
) -> Result<()> {
259
while let Some(desc) = self.cmd_queue.pop() {
260
let mut resps = self.handle_command_desc(device, wait_ctx, desc)?;
261
self.write_responses(&mut resps)?;
262
}
263
Ok(())
264
}
265
266
/// Handles an event notified via an event.
267
///
268
/// # Arguments
269
///
270
/// * `device` - Instance of backend device
271
/// * `stream_id` - Stream session ID of the event
272
/// * `wait_ctx` - `device` may register a new `Token::Buffer` for a new stream session to
273
/// `wait_ctx`
274
fn handle_event(
275
&mut self,
276
device: &mut dyn Device,
277
stream_id: u32,
278
wait_ctx: &WaitContext<Token>,
279
) -> Result<()> {
280
if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id, wait_ctx)
281
{
282
self.write_event_responses(event_responses, stream_id)?;
283
}
284
Ok(())
285
}
286
287
/// Handles a completed buffer barrier.
288
///
289
/// # Arguments
290
///
291
/// * `device` - Instance of backend device
292
/// * `stream_id` - Stream session ID of the event
293
/// * `wait_ctx` - `device` may deregister the completed `Token::BufferBarrier` from `wait_ctx`.
294
fn handle_buffer_barrier(
295
&mut self,
296
device: &mut dyn Device,
297
stream_id: u32,
298
wait_ctx: &WaitContext<Token>,
299
) -> Result<()> {
300
if let Some(event_responses) = device.process_buffer_barrier(stream_id, wait_ctx) {
301
self.write_event_responses(event_responses, stream_id)?;
302
}
303
Ok(())
304
}
305
306
/// Runs the video device virtio queues in a blocking way.
307
///
308
/// # Arguments
309
///
310
/// * `device` - Instance of backend device
311
/// * `kill_evt` - `Event` notified to make `run` stop and return
312
pub fn run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()> {
313
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
314
(self.cmd_queue.event(), Token::CmdQueue),
315
(self.event_queue.event(), Token::EventQueue),
316
(kill_evt, Token::Kill),
317
])
318
.map_err(Error::WaitContextCreationFailed)?;
319
320
loop {
321
let wait_events = wait_ctx.wait().map_err(Error::WaitError)?;
322
323
for wait_event in wait_events.iter().filter(|e| e.is_readable) {
324
match wait_event.token {
325
Token::CmdQueue => {
326
let _ = self.cmd_queue.event().wait();
327
self.handle_command_queue(device.as_mut(), &wait_ctx)?;
328
}
329
Token::EventQueue => {
330
let _ = self.event_queue.event().wait();
331
}
332
Token::Event { id } => {
333
self.handle_event(device.as_mut(), id, &wait_ctx)?;
334
}
335
Token::BufferBarrier { id } => {
336
self.handle_buffer_barrier(device.as_mut(), id, &wait_ctx)?;
337
}
338
Token::Kill => return Ok(()),
339
}
340
}
341
}
342
}
343
344
/// Runs the video device virtio queues asynchronously.
345
///
346
/// # Arguments
347
///
348
/// * `device` - Instance of backend device
349
/// * `ex` - Instance of `Executor` of asynchronous operations
350
/// * `cmd_evt` - Driver-to-device kick event for the command queue
351
/// * `event_evt` - Driver-to-device kick event for the event queue
352
#[allow(dead_code)]
353
pub async fn run_async(
354
mut self,
355
mut device: Box<dyn Device>,
356
ex: Executor,
357
cmd_evt: Event,
358
event_evt: Event,
359
) -> Result<()> {
360
let cmd_queue_evt =
361
EventAsync::new(cmd_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
362
let event_queue_evt =
363
EventAsync::new(event_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
364
365
// WaitContext to wait for the response from the encoder/decoder device.
366
let device_wait_ctx = WaitContext::new().map_err(Error::WaitContextCreationFailed)?;
367
let device_evt = ex
368
.async_from(AsyncWrapper::new(
369
clone_descriptor(&device_wait_ctx).map_err(Error::CloneDescriptorFailed)?,
370
))
371
.map_err(Error::EventAsyncCreationFailed)?;
372
373
loop {
374
let (
375
cmd_queue_evt,
376
device_evt,
377
// Ignore driver-to-device kicks since the event queue is write-only for a device.
378
_event_queue_evt,
379
) = select3(
380
cmd_queue_evt.next_val().boxed_local(),
381
device_evt.wait_readable().boxed_local(),
382
event_queue_evt.next_val().boxed_local(),
383
)
384
.await;
385
386
if let SelectResult::Finished(_) = cmd_queue_evt {
387
self.handle_command_queue(device.as_mut(), &device_wait_ctx)?;
388
}
389
390
if let SelectResult::Finished(_) = device_evt {
391
let device_events = match device_wait_ctx.wait_timeout(Duration::from_secs(0)) {
392
Ok(device_events) => device_events,
393
Err(_) => {
394
error!("failed to read a device event");
395
continue;
396
}
397
};
398
for device_event in device_events {
399
// A Device must trigger only Token::Event. See [`Device::process_cmd()`].
400
if let Token::Event { id } = device_event.token {
401
self.handle_event(device.as_mut(), id, &device_wait_ctx)?;
402
} else {
403
error!(
404
"invalid event is triggered by a device {:?}",
405
device_event.token
406
);
407
}
408
}
409
}
410
}
411
}
412
}
413
414