use std::collections::VecDeque;
use std::time::Duration;
use base::clone_descriptor;
use base::error;
use base::info;
use base::Event;
use base::WaitContext;
use cros_async::select3;
use cros_async::AsyncWrapper;
use cros_async::EventAsync;
use cros_async::Executor;
use cros_async::SelectResult;
use futures::FutureExt;
use crate::virtio::video::async_cmd_desc_map::AsyncCmdDescMap;
use crate::virtio::video::command::QueueType;
use crate::virtio::video::command::VideoCmd;
use crate::virtio::video::device::AsyncCmdResponse;
use crate::virtio::video::device::AsyncCmdTag;
use crate::virtio::video::device::Device;
use crate::virtio::video::device::Token;
use crate::virtio::video::device::VideoCmdResponseType;
use crate::virtio::video::device::VideoEvtResponseType;
use crate::virtio::video::event;
use crate::virtio::video::event::EvtType;
use crate::virtio::video::event::VideoEvt;
use crate::virtio::video::response;
use crate::virtio::video::response::Response;
use crate::virtio::video::Error;
use crate::virtio::video::Result;
use crate::virtio::DescriptorChain;
use crate::virtio::Queue;
pub struct Worker {
cmd_queue: Queue,
event_queue: Queue,
desc_map: AsyncCmdDescMap,
}
type WritableResp = (DescriptorChain, response::CmdResponse);
impl Worker {
pub fn new(cmd_queue: Queue, event_queue: Queue) -> Self {
Self {
cmd_queue,
event_queue,
desc_map: Default::default(),
}
}
fn write_responses(&mut self, responses: &mut VecDeque<WritableResp>) -> Result<()> {
if responses.is_empty() {
return Ok(());
}
while let Some((mut desc, response)) = responses.pop_front() {
if let Err(e) = response.write(&mut desc.writer) {
error!(
"failed to write a command response for {:?}: {}",
response, e
);
}
self.cmd_queue.add_used(desc);
}
self.cmd_queue.trigger_interrupt();
Ok(())
}
fn write_event(&mut self, event: event::VideoEvt) -> Result<()> {
let mut desc = self
.event_queue
.pop()
.ok_or(Error::DescriptorNotAvailable)?;
event
.write(&mut desc.writer)
.map_err(|error| Error::WriteEventFailure { event, error })?;
self.event_queue.add_used(desc);
self.event_queue.trigger_interrupt();
Ok(())
}
fn write_event_responses(
&mut self,
event_responses: Vec<VideoEvtResponseType>,
stream_id: u32,
) -> Result<()> {
let mut responses: VecDeque<WritableResp> = Default::default();
for event_response in event_responses {
match event_response {
VideoEvtResponseType::AsyncCmd(async_response) => {
let AsyncCmdResponse {
tag,
response: cmd_result,
} = async_response;
match self.desc_map.remove(&tag) {
Some(desc) => {
let cmd_response = match cmd_result {
Ok(r) => r,
Err(e) => {
error!("returning async error response: {}", &e);
e.into()
}
};
responses.push_back((desc, cmd_response))
}
None => match tag {
AsyncCmdTag::Drain { stream_id: _ } => {
info!("ignoring unknown drain response");
}
_ => {
error!("dropping response for an untracked command: {:?}", tag);
}
},
}
}
VideoEvtResponseType::Event(evt) => {
self.write_event(evt)?;
}
}
}
if let Err(e) = self.write_responses(&mut responses) {
error!("Failed to write event responses: {:?}", e);
let _ = self.write_event(VideoEvt {
typ: EvtType::Error,
stream_id,
});
return Err(e);
}
Ok(())
}
fn handle_command_desc(
&mut self,
device: &mut dyn Device,
wait_ctx: &WaitContext<Token>,
mut desc: DescriptorChain,
) -> Result<VecDeque<WritableResp>> {
let mut responses: VecDeque<WritableResp> = Default::default();
let cmd = VideoCmd::from_reader(&mut desc.reader).map_err(Error::ReadFailure)?;
let async_responses = match cmd {
VideoCmd::ResourceDestroyAll {
stream_id,
queue_type,
} => self
.desc_map
.create_cancellation_responses(&stream_id, Some(queue_type), None),
VideoCmd::StreamDestroy { stream_id } => self
.desc_map
.create_cancellation_responses(&stream_id, None, None),
VideoCmd::QueueClear {
stream_id,
queue_type: QueueType::Output,
} => {
self.desc_map.create_cancellation_responses(
&stream_id,
Some(QueueType::Output),
None,
)
}
_ => Default::default(),
};
for async_response in async_responses {
let AsyncCmdResponse {
tag,
response: cmd_result,
} = async_response;
let destroy_response = match cmd_result {
Ok(r) => r,
Err(e) => {
error!("returning async error response: {}", &e);
e.into()
}
};
match self.desc_map.remove(&tag) {
Some(destroy_desc) => {
responses.push_back((destroy_desc, destroy_response));
}
None => error!("dropping response for an untracked command: {:?}", tag),
}
}
let (cmd_response, event_responses_with_id) = device.process_cmd(cmd, wait_ctx);
match cmd_response {
VideoCmdResponseType::Sync(r) => {
responses.push_back((desc, r));
}
VideoCmdResponseType::Async(tag) => {
self.desc_map.insert(tag, desc);
}
}
if let Some((stream_id, event_responses)) = event_responses_with_id {
self.write_event_responses(event_responses, stream_id)?;
}
Ok(responses)
}
fn handle_command_queue(
&mut self,
device: &mut dyn Device,
wait_ctx: &WaitContext<Token>,
) -> Result<()> {
while let Some(desc) = self.cmd_queue.pop() {
let mut resps = self.handle_command_desc(device, wait_ctx, desc)?;
self.write_responses(&mut resps)?;
}
Ok(())
}
fn handle_event(
&mut self,
device: &mut dyn Device,
stream_id: u32,
wait_ctx: &WaitContext<Token>,
) -> Result<()> {
if let Some(event_responses) = device.process_event(&mut self.desc_map, stream_id, wait_ctx)
{
self.write_event_responses(event_responses, stream_id)?;
}
Ok(())
}
fn handle_buffer_barrier(
&mut self,
device: &mut dyn Device,
stream_id: u32,
wait_ctx: &WaitContext<Token>,
) -> Result<()> {
if let Some(event_responses) = device.process_buffer_barrier(stream_id, wait_ctx) {
self.write_event_responses(event_responses, stream_id)?;
}
Ok(())
}
pub fn run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()> {
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(self.cmd_queue.event(), Token::CmdQueue),
(self.event_queue.event(), Token::EventQueue),
(kill_evt, Token::Kill),
])
.map_err(Error::WaitContextCreationFailed)?;
loop {
let wait_events = wait_ctx.wait().map_err(Error::WaitError)?;
for wait_event in wait_events.iter().filter(|e| e.is_readable) {
match wait_event.token {
Token::CmdQueue => {
let _ = self.cmd_queue.event().wait();
self.handle_command_queue(device.as_mut(), &wait_ctx)?;
}
Token::EventQueue => {
let _ = self.event_queue.event().wait();
}
Token::Event { id } => {
self.handle_event(device.as_mut(), id, &wait_ctx)?;
}
Token::BufferBarrier { id } => {
self.handle_buffer_barrier(device.as_mut(), id, &wait_ctx)?;
}
Token::Kill => return Ok(()),
}
}
}
}
#[allow(dead_code)]
pub async fn run_async(
mut self,
mut device: Box<dyn Device>,
ex: Executor,
cmd_evt: Event,
event_evt: Event,
) -> Result<()> {
let cmd_queue_evt =
EventAsync::new(cmd_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
let event_queue_evt =
EventAsync::new(event_evt, &ex).map_err(Error::EventAsyncCreationFailed)?;
let device_wait_ctx = WaitContext::new().map_err(Error::WaitContextCreationFailed)?;
let device_evt = ex
.async_from(AsyncWrapper::new(
clone_descriptor(&device_wait_ctx).map_err(Error::CloneDescriptorFailed)?,
))
.map_err(Error::EventAsyncCreationFailed)?;
loop {
let (
cmd_queue_evt,
device_evt,
_event_queue_evt,
) = select3(
cmd_queue_evt.next_val().boxed_local(),
device_evt.wait_readable().boxed_local(),
event_queue_evt.next_val().boxed_local(),
)
.await;
if let SelectResult::Finished(_) = cmd_queue_evt {
self.handle_command_queue(device.as_mut(), &device_wait_ctx)?;
}
if let SelectResult::Finished(_) = device_evt {
let device_events = match device_wait_ctx.wait_timeout(Duration::from_secs(0)) {
Ok(device_events) => device_events,
Err(_) => {
error!("failed to read a device event");
continue;
}
};
for device_event in device_events {
if let Token::Event { id } = device_event.token {
self.handle_event(device.as_mut(), id, &device_wait_ctx)?;
} else {
error!(
"invalid event is triggered by a device {:?}",
device_event.token
);
}
}
}
}
}
}