Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/base_tokio/src/sys/linux/tube.rs
5394 views
1
// Copyright 2024 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::os::fd::AsRawFd;
6
7
use tokio::io::unix::AsyncFd;
8
9
/// An async version of `base::Tube`.
10
pub struct TubeTokio(AsyncFd<base::Tube>);
11
12
impl TubeTokio {
13
pub fn new(tube: base::Tube) -> anyhow::Result<Self> {
14
base::add_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)?;
15
Ok(Self(AsyncFd::new(tube)?))
16
}
17
18
pub async fn into_inner(self) -> base::Tube {
19
let tube = self.0.into_inner();
20
base::clear_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)
21
.expect("failed to clear O_NONBLOCK");
22
tube
23
}
24
25
pub async fn send<T: serde::Serialize + Send + 'static>(
26
&mut self,
27
msg: T,
28
) -> base::TubeResult<()> {
29
loop {
30
let mut guard = self.0.writable().await.map_err(base::TubeError::Send)?;
31
let io_result = guard.try_io(|inner| {
32
// Re-using the non-async send is potentially hazardous since it isn't explicitly
33
// written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
34
// single write syscall, it should be OK.
35
let r = inner.get_ref().send(&msg);
36
// Transpose the `std::io::Error` errors outside so that `try_io` can check them
37
// for `WouldBlock`.
38
match r {
39
Ok(x) => Ok(Ok(x)),
40
Err(base::TubeError::Send(e)) => Err(e),
41
Err(e) => Ok(Err(e)),
42
}
43
});
44
45
match io_result {
46
Ok(result) => {
47
return match result {
48
Ok(Ok(x)) => Ok(x),
49
Ok(Err(e)) => Err(e),
50
Err(e) => Err(base::TubeError::Send(e)),
51
}
52
}
53
Err(_would_block) => continue,
54
}
55
}
56
}
57
58
pub async fn recv<T: serde::de::DeserializeOwned + Send + 'static>(
59
&mut self,
60
) -> base::TubeResult<T> {
61
loop {
62
let mut guard = self.0.readable().await.map_err(base::TubeError::Recv)?;
63
let io_result = guard.try_io(|inner| {
64
// Re-using the non-async recv is potentially hazardous since it isn't explicitly
65
// written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
66
// single read syscall, it should be OK.
67
let r = inner.get_ref().recv();
68
// Transpose the `std::io::Error` errors outside so that `try_io` can check them
69
// for `WouldBlock`.
70
match r {
71
Ok(x) => Ok(Ok(x)),
72
Err(base::TubeError::Recv(e)) => Err(e),
73
Err(e) => Ok(Err(e)),
74
}
75
});
76
77
match io_result {
78
Ok(result) => {
79
return match result {
80
Ok(Ok(x)) => Ok(x),
81
Ok(Err(e)) => Err(e),
82
Err(e) => Err(base::TubeError::Recv(e)),
83
}
84
}
85
Err(_would_block) => continue,
86
}
87
}
88
}
89
}
90
91