#[cfg(any(target_os = "android", target_os = "linux"))]
use std::os::unix::io::RawFd;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use remain::sorted;
use thiserror::Error;
use crate::BoxError;
use crate::SampleFormat;
use crate::StreamDirection;
use crate::StreamEffect;
type GenericResult<T> = std::result::Result<T, BoxError>;
pub trait BufferSet {
fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>;
fn ignore(&mut self) -> GenericResult<()>;
}
#[sorted]
#[derive(Error, Debug)]
pub enum Error {
#[error("Provided number of frames {0} exceeds requested number of frames {1}")]
TooManyFrames(usize, usize),
}
pub struct ServerRequest<'a> {
requested_frames: usize,
buffer_set: &'a mut dyn BufferSet,
}
impl<'a> ServerRequest<'a> {
pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self {
Self {
requested_frames,
buffer_set,
}
}
pub fn requested_frames(&self) -> usize {
self.requested_frames
}
pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> {
if frames > self.requested_frames {
return Err(Box::new(Error::TooManyFrames(
frames,
self.requested_frames,
)));
}
self.buffer_set.callback(offset, frames)
}
pub fn ignore_request(self) -> GenericResult<()> {
self.buffer_set.ignore()
}
}
pub trait ShmStream: Send {
fn frame_size(&self) -> usize;
fn num_channels(&self) -> usize;
fn frame_rate(&self) -> u32;
fn wait_for_next_action_with_timeout(
&mut self,
timeout: Duration,
) -> GenericResult<Option<ServerRequest>>;
}
pub trait SharedMemory {
type Error: std::error::Error;
fn anon(size: u64) -> Result<Self, Self::Error>
where
Self: Sized;
fn size(&self) -> u64;
#[cfg(any(target_os = "android", target_os = "linux"))]
fn as_raw_fd(&self) -> RawFd;
}
pub trait ShmStreamSource<E: std::error::Error>: Send {
#[allow(clippy::too_many_arguments)]
fn new_stream(
&mut self,
direction: StreamDirection,
num_channels: usize,
format: SampleFormat,
frame_rate: u32,
buffer_size: usize,
effects: &[StreamEffect],
client_shm: &dyn SharedMemory<Error = E>,
buffer_offsets: [u64; 2],
) -> GenericResult<Box<dyn ShmStream>>;
#[cfg(any(target_os = "android", target_os = "linux"))]
fn keep_fds(&self) -> Vec<RawFd> {
Vec::new()
}
}
pub struct NullShmStream {
num_channels: usize,
frame_rate: u32,
buffer_size: usize,
frame_size: usize,
interval: Duration,
next_frame: Duration,
start_time: Instant,
}
impl NullShmStream {
pub fn new(
buffer_size: usize,
num_channels: usize,
format: SampleFormat,
frame_rate: u32,
) -> Self {
let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
Self {
num_channels,
frame_rate,
buffer_size,
frame_size: format.sample_bytes() * num_channels,
interval,
next_frame: interval,
start_time: Instant::now(),
}
}
}
impl BufferSet for NullShmStream {
fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
Ok(())
}
fn ignore(&mut self) -> GenericResult<()> {
Ok(())
}
}
impl ShmStream for NullShmStream {
fn frame_size(&self) -> usize {
self.frame_size
}
fn num_channels(&self) -> usize {
self.num_channels
}
fn frame_rate(&self) -> u32 {
self.frame_rate
}
fn wait_for_next_action_with_timeout(
&mut self,
timeout: Duration,
) -> GenericResult<Option<ServerRequest>> {
let elapsed = self.start_time.elapsed();
if elapsed < self.next_frame {
if timeout < self.next_frame - elapsed {
std::thread::sleep(timeout);
return Ok(None);
} else {
std::thread::sleep(self.next_frame - elapsed);
}
}
self.next_frame += self.interval;
Ok(Some(ServerRequest::new(self.buffer_size, self)))
}
}
#[derive(Default)]
pub struct NullShmStreamSource;
impl NullShmStreamSource {
pub fn new() -> Self {
NullShmStreamSource
}
}
impl<E: std::error::Error> ShmStreamSource<E> for NullShmStreamSource {
fn new_stream(
&mut self,
_direction: StreamDirection,
num_channels: usize,
format: SampleFormat,
frame_rate: u32,
buffer_size: usize,
_effects: &[StreamEffect],
_client_shm: &dyn SharedMemory<Error = E>,
_buffer_offsets: [u64; 2],
) -> GenericResult<Box<dyn ShmStream>> {
let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate);
Ok(Box::new(new_stream))
}
}
#[derive(Clone)]
pub struct MockShmStream {
num_channels: usize,
frame_rate: u32,
request_size: usize,
frame_size: usize,
request_notifier: Arc<(Mutex<bool>, Condvar)>,
}
impl MockShmStream {
pub fn new(
num_channels: usize,
frame_rate: u32,
format: SampleFormat,
buffer_size: usize,
) -> Self {
#[allow(clippy::mutex_atomic)]
Self {
num_channels,
frame_rate,
request_size: buffer_size,
frame_size: format.sample_bytes() * num_channels,
request_notifier: Arc::new((Mutex::new(false), Condvar::new())),
}
}
pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool {
let (lock, cvar) = &*self.request_notifier;
let mut requested = lock.lock().unwrap();
*requested = true;
cvar.notify_one();
let start_time = Instant::now();
while *requested {
requested = cvar.wait_timeout(requested, timeout).unwrap().0;
if start_time.elapsed() > timeout {
*requested = false;
return false;
}
}
true
}
fn notify_request(&mut self) {
let (lock, cvar) = &*self.request_notifier;
let mut requested = lock.lock().unwrap();
*requested = false;
cvar.notify_one();
}
}
impl BufferSet for MockShmStream {
fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
self.notify_request();
Ok(())
}
fn ignore(&mut self) -> GenericResult<()> {
self.notify_request();
Ok(())
}
}
impl ShmStream for MockShmStream {
fn frame_size(&self) -> usize {
self.frame_size
}
fn num_channels(&self) -> usize {
self.num_channels
}
fn frame_rate(&self) -> u32 {
self.frame_rate
}
fn wait_for_next_action_with_timeout(
&mut self,
timeout: Duration,
) -> GenericResult<Option<ServerRequest>> {
{
let start_time = Instant::now();
let (lock, cvar) = &*self.request_notifier;
let mut requested = lock.lock().unwrap();
while !*requested {
requested = cvar.wait_timeout(requested, timeout).unwrap().0;
if start_time.elapsed() > timeout {
return Ok(None);
}
}
}
Ok(Some(ServerRequest::new(self.request_size, self)))
}
}
#[derive(Clone, Default)]
pub struct MockShmStreamSource {
last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>,
}
impl MockShmStreamSource {
pub fn new() -> Self {
Default::default()
}
pub fn get_last_stream(&self) -> MockShmStream {
let (last_stream, cvar) = &*self.last_stream;
let mut stream = last_stream.lock().unwrap();
loop {
match &*stream {
None => stream = cvar.wait(stream).unwrap(),
Some(ref s) => return s.clone(),
};
}
}
}
impl<E: std::error::Error> ShmStreamSource<E> for MockShmStreamSource {
fn new_stream(
&mut self,
_direction: StreamDirection,
num_channels: usize,
format: SampleFormat,
frame_rate: u32,
buffer_size: usize,
_effects: &[StreamEffect],
_client_shm: &dyn SharedMemory<Error = E>,
_buffer_offsets: [u64; 2],
) -> GenericResult<Box<dyn ShmStream>> {
let (last_stream, cvar) = &*self.last_stream;
let mut stream = last_stream.lock().unwrap();
let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size);
*stream = Some(new_stream.clone());
cvar.notify_one();
Ok(Box::new(new_stream))
}
}
#[cfg(all(test, unix))]
pub mod tests {
use super::*;
struct MockSharedMemory {}
impl SharedMemory for MockSharedMemory {
type Error = super::Error;
fn anon(_: u64) -> Result<Self, Self::Error> {
Ok(MockSharedMemory {})
}
fn size(&self) -> u64 {
0
}
#[cfg(any(target_os = "android", target_os = "linux"))]
fn as_raw_fd(&self) -> RawFd {
0
}
}
#[test]
fn mock_trigger_callback() {
let stream_source = MockShmStreamSource::new();
let mut thread_stream_source = stream_source.clone();
let buffer_size = 480;
let num_channels = 2;
let format = SampleFormat::S24LE;
let shm = MockSharedMemory {};
let handle = std::thread::spawn(move || {
let mut stream = thread_stream_source
.new_stream(
StreamDirection::Playback,
num_channels,
format,
44100,
buffer_size,
&[],
&shm,
[400, 8000],
)
.expect("Failed to create stream");
let request = stream
.wait_for_next_action_with_timeout(Duration::from_secs(5))
.expect("Failed to wait for next action");
match request {
Some(r) => {
let requested = r.requested_frames();
r.set_buffer_offset_and_frames(872, requested)
.expect("Failed to set buffer offset and frames");
requested
}
None => 0,
}
});
let mut stream = stream_source.get_last_stream();
assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1)));
let requested_frames = handle.join().expect("Failed to join thread");
assert_eq!(requested_frames, buffer_size);
}
#[test]
fn null_consumption_rate() {
let frame_rate = 44100;
let buffer_size = 480;
let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
let shm = MockSharedMemory {};
let start = Instant::now();
let mut stream_source = NullShmStreamSource::new();
let mut stream = stream_source
.new_stream(
StreamDirection::Playback,
2,
SampleFormat::S24LE,
frame_rate,
buffer_size,
&[],
&shm,
[400, 8000],
)
.expect("Failed to create stream");
let timeout = Duration::from_secs(5);
let request = stream
.wait_for_next_action_with_timeout(timeout)
.expect("Failed to wait for first request")
.expect("First request should not have timed out");
request
.set_buffer_offset_and_frames(276, 480)
.expect("Failed to set buffer offset and length");
let _request = stream
.wait_for_next_action_with_timeout(timeout)
.expect("Failed to wait for second request");
let elapsed = start.elapsed();
assert!(
elapsed > interval,
"wait_for_next_action_with_timeout didn't block long enough: {elapsed:?}"
);
assert!(
elapsed < timeout,
"wait_for_next_action_with_timeout blocked for too long: {elapsed:?}"
);
}
}