use base::warn;
use base::AsRawDescriptor;
use base::Descriptor;
use base::Error;
use base::Event;
use base::Tube;
use base::TubeError;
use base::TubeResult;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use winapi::um::ioapiset::CancelIoEx;
pub struct TubeTokio {
worker: tokio::task::JoinHandle<Tube>,
cmd_tx: mpsc::Sender<Box<dyn FnOnce(&Tube) + Send>>,
read_notifier: Event,
tube_descriptor: Descriptor,
}
impl TubeTokio {
pub fn new(mut tube: Tube) -> anyhow::Result<Self> {
let read_notifier = tube.get_read_notifier_event().try_clone()?;
let tube_descriptor = Descriptor(tube.as_raw_descriptor());
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Box<dyn FnOnce(&Tube) + Send>>(1);
let worker = tokio::task::spawn_blocking(move || {
while let Some(f) = cmd_rx.blocking_recv() {
f(&mut tube)
}
tube
});
Ok(Self {
worker,
cmd_tx,
read_notifier,
tube_descriptor,
})
}
pub async fn into_inner(self) -> Tube {
drop(self.cmd_tx);
if unsafe { CancelIoEx(self.tube_descriptor.0, std::ptr::null_mut()) } == 0 {
warn!(
"Cancel IO for handle:{:?} failed with {}",
self.tube_descriptor.0,
Error::last()
);
}
self.worker.await.expect("failed to join tube worker")
}
pub async fn send<T: serde::Serialize + Send + 'static>(&mut self, msg: T) -> TubeResult<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(Box::new(move |tube| {
let _ = tx.send(tube.send(&msg));
}))
.await
.expect("worker missing");
rx.await.map_err(|_| TubeError::OperationCancelled)??;
Ok(())
}
pub async fn recv<T: serde::de::DeserializeOwned + Send + 'static>(&mut self) -> TubeResult<T> {
base::sys::windows::async_wait_for_single_object(&self.read_notifier)
.await
.map_err(TubeError::Recv)?;
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(Box::new(move |tube| {
let _ = tx.send(tube.recv());
}))
.await
.expect("worker missing");
rx.await.map_err(|_| TubeError::OperationCancelled)?
}
}