Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/base/src/sys/unix/tube.rs
5394 views
1
// Copyright 2021 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::os::unix::prelude::AsRawFd;
6
use std::os::unix::prelude::RawFd;
7
use std::time::Duration;
8
9
use serde::de::DeserializeOwned;
10
use serde::Deserialize;
11
use serde::Serialize;
12
13
use crate::descriptor::AsRawDescriptor;
14
use crate::descriptor_reflection::deserialize_with_descriptors;
15
use crate::descriptor_reflection::SerializeDescriptors;
16
use crate::handle_eintr;
17
use crate::tube::Error;
18
use crate::tube::RecvTube;
19
use crate::tube::Result;
20
use crate::tube::SendTube;
21
use crate::RawDescriptor;
22
use crate::ReadNotifier;
23
use crate::ScmSocket;
24
use crate::UnixSeqpacket;
25
use crate::SCM_SOCKET_MAX_FD_COUNT;
26
27
// This size matches the inline buffer size of CmsgBuffer.
28
const TUBE_MAX_FDS: usize = 32;
29
30
/// Bidirectional tube that support both send and recv.
31
#[derive(Serialize, Deserialize)]
32
pub struct Tube {
33
socket: ScmSocket<UnixSeqpacket>,
34
}
35
36
impl Tube {
37
/// Create a pair of connected tubes. Request is sent in one direction while response is in the
38
/// other direction.
39
pub fn pair() -> Result<(Tube, Tube)> {
40
let (socket1, socket2) = UnixSeqpacket::pair().map_err(Error::Pair)?;
41
let tube1 = Tube::try_from(socket1)?;
42
let tube2 = Tube::try_from(socket2)?;
43
Ok((tube1, tube2))
44
}
45
46
/// DO NOT USE this method directly as it will become private soon (b/221484449). Use a
47
/// directional Tube pair instead.
48
#[deprecated]
49
pub fn try_clone(&self) -> Result<Self> {
50
self.socket
51
.inner()
52
.try_clone()
53
.map_err(Error::Clone)?
54
.try_into()
55
}
56
57
/// Sends a message via a Tube.
58
/// The number of file descriptors that this method can send is limited to `TUBE_MAX_FDS`.
59
/// If you want to send more descriptors, use `send_with_max_fds` instead.
60
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
61
self.send_with_max_fds(msg, TUBE_MAX_FDS)
62
}
63
64
/// Sends a message with at most `max_fds` file descriptors via a Tube.
65
/// Note that `max_fds` must not exceed `SCM_SOCKET_MAX_FD_COUNT` (= 253).
66
pub fn send_with_max_fds<T: Serialize>(&self, msg: &T, max_fds: usize) -> Result<()> {
67
if max_fds > SCM_SOCKET_MAX_FD_COUNT {
68
return Err(Error::SendTooManyFds);
69
}
70
let msg_serialize = SerializeDescriptors::new(&msg);
71
let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
72
let msg_descriptors = msg_serialize.into_descriptors();
73
74
if msg_descriptors.len() > max_fds {
75
return Err(Error::SendTooManyFds);
76
}
77
78
handle_eintr!(self.socket.send_with_fds(&msg_json, &msg_descriptors))
79
.map_err(Error::Send)?;
80
Ok(())
81
}
82
83
/// Recieves a message from a Tube.
84
/// If the sender sent file descriptors more than TUBE_MAX_FDS with `send_with_max_fds`, use
85
/// `recv_with_max_fds` instead.
86
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
87
self.recv_with_max_fds(TUBE_MAX_FDS)
88
}
89
90
/// Recieves a message with at most `max_fds` file descriptors from a Tube.
91
pub fn recv_with_max_fds<T: DeserializeOwned>(&self, max_fds: usize) -> Result<T> {
92
if max_fds > SCM_SOCKET_MAX_FD_COUNT {
93
return Err(Error::RecvTooManyFds);
94
}
95
96
// WARNING: The `cros_async` and `base_tokio` tube wrappers both assume that, if the tube
97
// is readable, then a call to `Tube::recv` will not block (which ought to be true since we
98
// use SOCK_SEQPACKET and a single recvmsg call currently).
99
100
let msg_size =
101
handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?;
102
// This buffer is the right size, as the size received in next_packet_size() represents the
103
// size of only the message itself and not the file descriptors. The descriptors are stored
104
// separately in msghdr::msg_control.
105
let mut msg_json = vec![0u8; msg_size];
106
107
let (msg_json_size, msg_descriptors) =
108
handle_eintr!(self.socket.recv_with_fds(&mut msg_json, max_fds))
109
.map_err(Error::Recv)?;
110
111
if msg_json_size == 0 {
112
return Err(Error::Disconnected);
113
}
114
115
deserialize_with_descriptors(
116
|| serde_json::from_slice(&msg_json[0..msg_json_size]),
117
msg_descriptors,
118
)
119
.map_err(Error::Json)
120
}
121
122
pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> {
123
self.socket
124
.inner()
125
.set_write_timeout(timeout)
126
.map_err(Error::SetSendTimeout)
127
}
128
129
pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> {
130
self.socket
131
.inner()
132
.set_read_timeout(timeout)
133
.map_err(Error::SetRecvTimeout)
134
}
135
136
#[cfg(feature = "proto_tube")]
137
fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
138
let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
139
let no_fds: [RawFd; 0] = [];
140
141
handle_eintr!(self.socket.send_with_fds(&bytes, &no_fds)).map_err(Error::Send)?;
142
143
Ok(())
144
}
145
146
#[cfg(feature = "proto_tube")]
147
fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
148
let msg_size =
149
handle_eintr!(self.socket.inner().next_packet_size()).map_err(Error::Recv)?;
150
let mut msg_bytes = vec![0u8; msg_size];
151
152
let (msg_bytes_size, _) =
153
handle_eintr!(self.socket.recv_with_fds(&mut msg_bytes, TUBE_MAX_FDS))
154
.map_err(Error::Recv)?;
155
156
if msg_bytes_size == 0 {
157
return Err(Error::Disconnected);
158
}
159
160
protobuf::Message::parse_from_bytes(&msg_bytes).map_err(Error::Proto)
161
}
162
}
163
164
impl TryFrom<UnixSeqpacket> for Tube {
165
type Error = Error;
166
167
fn try_from(socket: UnixSeqpacket) -> Result<Self> {
168
Ok(Tube {
169
socket: socket.try_into().map_err(Error::ScmSocket)?,
170
})
171
}
172
}
173
174
impl AsRawDescriptor for Tube {
175
fn as_raw_descriptor(&self) -> RawDescriptor {
176
self.socket.as_raw_descriptor()
177
}
178
}
179
180
impl AsRawFd for Tube {
181
fn as_raw_fd(&self) -> RawFd {
182
self.socket.inner().as_raw_descriptor()
183
}
184
}
185
186
impl ReadNotifier for Tube {
187
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
188
&self.socket
189
}
190
}
191
192
impl AsRawDescriptor for SendTube {
193
fn as_raw_descriptor(&self) -> RawDescriptor {
194
self.0.as_raw_descriptor()
195
}
196
}
197
198
impl AsRawDescriptor for RecvTube {
199
fn as_raw_descriptor(&self) -> RawDescriptor {
200
self.0.as_raw_descriptor()
201
}
202
}
203
204
/// Wrapper for Tube used for sending and receiving protos - avoids extra overhead of serialization
205
/// via serde_json. Since protos should be standalone objects we do not support sending of file
206
/// descriptors as a normal Tube would.
207
#[cfg(feature = "proto_tube")]
208
pub struct ProtoTube(Tube);
209
210
#[cfg(feature = "proto_tube")]
211
impl ProtoTube {
212
pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
213
Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
214
}
215
216
pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
217
self.0.send_proto(msg)
218
}
219
220
pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
221
self.0.recv_proto()
222
}
223
}
224
225
#[cfg(feature = "proto_tube")]
226
impl From<Tube> for ProtoTube {
227
fn from(tube: Tube) -> Self {
228
ProtoTube(tube)
229
}
230
}
231
232
#[cfg(all(feature = "proto_tube", test))]
233
#[allow(unused_variables)]
234
mod tests {
235
// not testing this proto specifically, just need an existing one to test the ProtoTube.
236
use protos::cdisk_spec::ComponentDisk;
237
238
use super::*;
239
240
#[test]
241
fn tube_serializes_and_deserializes() {
242
let (pt1, pt2) = ProtoTube::pair().unwrap();
243
let proto = ComponentDisk {
244
file_path: "/some/cool/path".to_string(),
245
offset: 99,
246
..ComponentDisk::new()
247
};
248
249
pt1.send_proto(&proto).unwrap();
250
251
let recv_proto = pt2.recv_proto().unwrap();
252
253
assert!(proto.eq(&recv_proto));
254
}
255
}
256
257