Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/base/src/sys/windows/tube.rs
5394 views
1
// Copyright 2021 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::io;
6
use std::io::Cursor;
7
use std::io::Read;
8
use std::io::Write;
9
use std::mem;
10
use std::os::windows::io::AsRawHandle;
11
use std::os::windows::io::RawHandle;
12
use std::time::Duration;
13
14
use log::warn;
15
use serde::de::DeserializeOwned;
16
use serde::Deserialize;
17
use serde::Serialize;
18
use serde::Serializer;
19
use zerocopy::FromBytes;
20
use zerocopy::Immutable;
21
use zerocopy::IntoBytes;
22
use zerocopy::KnownLayout;
23
24
use crate::descriptor::AsRawDescriptor;
25
use crate::descriptor::FromRawDescriptor;
26
use crate::descriptor::SafeDescriptor;
27
use crate::descriptor_reflection::deserialize_with_descriptors;
28
use crate::descriptor_reflection::SerializeDescriptors;
29
use crate::tube::Error;
30
use crate::tube::RecvTube;
31
use crate::tube::Result;
32
use crate::tube::SendTube;
33
use crate::BlockingMode;
34
use crate::CloseNotifier;
35
use crate::Event;
36
use crate::EventToken;
37
use crate::FramingMode;
38
use crate::PipeConnection;
39
use crate::RawDescriptor;
40
use crate::ReadNotifier;
41
use crate::StreamChannel;
42
43
/// Bidirectional tube that support both send and recv.
44
///
45
/// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
46
/// (A, B). We wish to send B to another process, and communicate with it using A from the current
47
/// process:
48
/// 1. B's target_pid must be set to the current PID *before* serialization. There is a
49
/// serialization hook that sets it to the current PID automatically if target_pid is unset.
50
/// 2. A's target_pid must be set to the PID of the process where B was sent.
51
///
52
/// If instead you are sending both A and B to separate processes, then:
53
/// 1. A's target_pid must be set to B's pid, manually.
54
/// 2. B's target_pid must be set to A's pid, manually.
55
///
56
/// Automating all of this and getting a completely clean interface is tricky. We would need
57
/// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
58
/// state about PIDs between the ends. There are alternatives like reusing the underlying
59
/// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
60
/// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
61
#[derive(Serialize, Deserialize, Debug)]
62
pub struct Tube {
63
socket: StreamChannel,
64
65
// Default target_pid to current PID on serialization (see `Tube` comment header for details).
66
#[serde(serialize_with = "set_tube_pid_on_serialize")]
67
target_pid: Option<u32>,
68
}
69
70
/// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
71
/// default it to the current process, because the other end will be in the current process.
72
fn set_tube_pid_on_serialize<S>(
73
existing_pid_value: &Option<u32>,
74
serializer: S,
75
) -> std::result::Result<S::Ok, S::Error>
76
where
77
S: Serializer,
78
{
79
match existing_pid_value {
80
Some(pid) => serializer.serialize_u32(*pid),
81
None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
82
}
83
}
84
85
#[derive(Copy, Clone, Debug, Default, FromBytes, Immutable, IntoBytes, KnownLayout)]
86
#[repr(C)]
87
struct MsgHeader {
88
msg_json_size: usize,
89
descriptor_json_size: usize,
90
}
91
92
static DH_TUBE: sync::Mutex<Option<DuplicateHandleTube>> = sync::Mutex::new(None);
93
static ALIAS_PID: sync::Mutex<Option<u32>> = sync::Mutex::new(None);
94
95
/// Set a tube to delegate duplicate handle calls.
96
pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
97
DH_TUBE.lock().replace(dh_tube);
98
}
99
100
/// Set alias pid for use with a DuplicateHandleTube.
101
pub fn set_alias_pid(alias_pid: u32) {
102
ALIAS_PID.lock().replace(alias_pid);
103
}
104
105
impl Tube {
106
/// Create a pair of connected tubes. Request is sent in one direction while response is
107
/// received in the other direction.
108
/// The result is in the form (server, client).
109
pub fn pair() -> Result<(Tube, Tube)> {
110
let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
111
.map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
112
113
Ok((Tube::new(socket1), Tube::new(socket2)))
114
}
115
116
/// Create a pair of connected tubes with the specified buffer size.
117
/// Request is sent in one direction while response is received in the other direction.
118
/// The result is in the form (server, client).
119
pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
120
let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
121
BlockingMode::Blocking,
122
FramingMode::Message,
123
buffer_size,
124
)
125
.map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
126
let tube1 = Tube::new(socket1);
127
let tube2 = Tube::new(socket2);
128
Ok((tube1, tube2))
129
}
130
131
// Create a new `Tube`.
132
pub fn new(socket: StreamChannel) -> Tube {
133
Tube {
134
socket,
135
target_pid: None,
136
}
137
}
138
139
pub(crate) fn try_clone(&self) -> Result<Self> {
140
Ok(Tube {
141
socket: self.socket.try_clone().map_err(Error::Clone)?,
142
target_pid: self.target_pid,
143
})
144
}
145
146
fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
147
let bytes = msg.write_to_bytes().map_err(Error::Proto)?;
148
let size_header = bytes.len();
149
150
let mut data_packet =
151
Cursor::new(Vec::with_capacity(mem::size_of::<usize>() + size_header));
152
data_packet
153
.write(&size_header.to_le_bytes())
154
.map_err(Error::from_send_io_buf_error)?;
155
data_packet.write(&bytes).map_err(Error::SendIoBuf)?;
156
self.socket
157
.write_immutable(&data_packet.into_inner())
158
.map_err(Error::from_send_error)?;
159
160
Ok(())
161
}
162
163
fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
164
let mut header_bytes = [0u8; mem::size_of::<usize>()];
165
perform_read(&mut |buf| (&self.socket).read(buf), &mut header_bytes)
166
.map_err(Error::from_recv_io_error)?;
167
let size_header = usize::from_le_bytes(header_bytes);
168
169
let mut proto_bytes = vec![0u8; size_header];
170
perform_read(&mut |buf| (&self.socket).read(buf), &mut proto_bytes)
171
.map_err(Error::from_recv_io_error)?;
172
protobuf::Message::parse_from_bytes(&proto_bytes).map_err(Error::Proto)
173
}
174
175
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
176
serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
177
}
178
179
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
180
deserialize_and_recv(|buf| (&self.socket).read(buf))
181
}
182
183
/// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
184
/// documentation to ensure you have a server pipe before calling.
185
#[cfg(windows)]
186
pub fn flush_blocking(&mut self) -> Result<()> {
187
self.socket.flush_blocking().map_err(Error::Flush)
188
}
189
190
/// For Tubes that span processes, this method must be used to set the PID of the other end
191
/// of the Tube, otherwise sending handles to the other end won't work.
192
pub fn set_target_pid(&mut self, target_pid: u32) {
193
self.target_pid = Some(target_pid);
194
}
195
196
/// Returns the PID of the process at the other end of the Tube, if any is set.
197
pub fn target_pid(&self) -> Option<u32> {
198
self.target_pid
199
}
200
201
/// TODO(b/145998747, b/184398671): this method should be removed.
202
pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
203
unimplemented!("To be removed/refactored upstream.");
204
}
205
206
/// TODO(b/145998747, b/184398671): this method should be removed.
207
pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
208
unimplemented!("To be removed/refactored upstream.");
209
}
210
211
pub fn get_read_notifier_event(&self) -> &Event {
212
self.socket.get_read_notifier_event()
213
}
214
215
pub fn get_close_notifier_event(&self) -> &Event {
216
self.socket.get_close_notifier_event()
217
}
218
}
219
220
pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
221
write_fn: F,
222
msg: &T,
223
target_pid: Option<u32>,
224
) -> Result<()> {
225
let msg_serialize = SerializeDescriptors::new(&msg);
226
let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
227
let msg_descriptors = msg_serialize.into_descriptors();
228
229
let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
230
for desc in msg_descriptors {
231
// Safe because these handles are guaranteed to be valid. Details:
232
// 1. They come from base::descriptor_reflection::with_as_descriptor.
233
// 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
234
// SafeDescriptor).
235
// 3. The owning object is borrowed by msg until sending is complete.
236
duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
237
}
238
239
let descriptor_json = if duped_descriptors.is_empty() {
240
None
241
} else {
242
Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
243
};
244
245
let header = MsgHeader {
246
msg_json_size: msg_json.len(),
247
descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
248
};
249
250
let mut data_packet = Cursor::new(Vec::with_capacity(
251
header.as_bytes().len() + header.msg_json_size + header.descriptor_json_size,
252
));
253
data_packet
254
.write(header.as_bytes())
255
.map_err(Error::SendIoBuf)?;
256
data_packet
257
.write(msg_json.as_slice())
258
.map_err(Error::SendIoBuf)?;
259
if let Some(descriptor_json) = descriptor_json {
260
data_packet
261
.write(descriptor_json.as_slice())
262
.map_err(Error::SendIoBuf)?;
263
}
264
265
// Multiple writers (producers) are safe because each write is atomic.
266
let data_bytes = data_packet.into_inner();
267
268
write_fn(&data_bytes).map_err(Error::from_send_error)?;
269
Ok(())
270
}
271
272
fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
273
match target_pid {
274
Some(pid) => match &*DH_TUBE.lock() {
275
Some(tube) => tube.request_duplicate_handle(pid, desc),
276
None => {
277
win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
278
}
279
},
280
None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
281
}
282
}
283
284
/// Reads a part of a Tube packet asserting that it was correctly read. In other words, we've
285
/// filled the supplied buffer.
286
fn perform_read<F: FnMut(&mut [u8]) -> io::Result<usize>>(
287
read_fn: &mut F,
288
buf: &mut [u8],
289
) -> io::Result<usize> {
290
let bytes_read = read_fn(buf)?;
291
292
if bytes_read != buf.len() {
293
Err(io::Error::new(
294
io::ErrorKind::UnexpectedEof,
295
format!(
296
"failed to fill whole buffer, expected {} got {}",
297
buf.len(),
298
bytes_read
299
),
300
))
301
} else {
302
Ok(bytes_read)
303
}
304
}
305
306
/// Deserializes a Tube packet by calling the supplied read function. This function MUST
307
/// assert that the buffer was filled.
308
pub fn deserialize_and_recv<T: DeserializeOwned, F: FnMut(&mut [u8]) -> io::Result<usize>>(
309
mut read_fn: F,
310
) -> Result<T> {
311
let mut header = MsgHeader::default();
312
perform_read(&mut read_fn, header.as_mut_bytes()).map_err(Error::from_recv_io_error)?;
313
314
let mut msg_json = vec![0u8; header.msg_json_size];
315
perform_read(&mut read_fn, msg_json.as_mut_slice()).map_err(Error::from_recv_io_error)?;
316
317
if msg_json.is_empty() {
318
// This means we got a message header, but there is no json body (due to a zero size in
319
// the header). This should never happen because it means the receiver is getting no
320
// data whatsoever from the sender.
321
return Err(Error::RecvUnexpectedEmptyBody);
322
}
323
324
let descriptor_usizes: Vec<usize> = if header.descriptor_json_size > 0 {
325
let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
326
perform_read(&mut read_fn, msg_descriptors_json.as_mut_slice())
327
.map_err(Error::from_recv_io_error)?;
328
serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?
329
} else {
330
Vec::new()
331
};
332
333
let msg_descriptors = descriptor_usizes.into_iter().map(|item| {
334
// SAFETY: the usizes are RawDescriptors that were duplicated and converted to usize in the
335
// send method.
336
unsafe { SafeDescriptor::from_raw_descriptor(item as RawDescriptor) }
337
});
338
339
deserialize_with_descriptors(|| serde_json::from_slice(&msg_json), msg_descriptors)
340
.map_err(Error::Json)
341
}
342
343
#[derive(EventToken, Eq, PartialEq, Copy, Clone)]
344
enum Token {
345
SocketReady,
346
}
347
348
impl AsRawDescriptor for Tube {
349
fn as_raw_descriptor(&self) -> RawDescriptor {
350
self.socket.as_raw_descriptor()
351
}
352
}
353
354
impl AsRawHandle for Tube {
355
fn as_raw_handle(&self) -> RawHandle {
356
self.as_raw_descriptor()
357
}
358
}
359
360
impl ReadNotifier for Tube {
361
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
362
self.socket.get_read_notifier()
363
}
364
}
365
366
impl CloseNotifier for Tube {
367
fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
368
self.socket.get_close_notifier()
369
}
370
}
371
372
impl AsRawDescriptor for SendTube {
373
fn as_raw_descriptor(&self) -> RawDescriptor {
374
self.0.as_raw_descriptor()
375
}
376
}
377
378
impl AsRawDescriptor for RecvTube {
379
fn as_raw_descriptor(&self) -> RawDescriptor {
380
self.0.as_raw_descriptor()
381
}
382
}
383
384
impl CloseNotifier for SendTube {
385
fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
386
self.0.get_close_notifier()
387
}
388
}
389
390
impl CloseNotifier for RecvTube {
391
fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
392
self.0.get_close_notifier()
393
}
394
}
395
396
/// A request to duplicate a handle to a target process.
397
#[derive(Serialize, Deserialize, Debug)]
398
pub struct DuplicateHandleRequest {
399
pub target_alias_pid: u32,
400
pub handle: usize,
401
}
402
403
/// Contains a duplicated handle or None if an error occurred.
404
#[derive(Serialize, Deserialize, Debug)]
405
pub struct DuplicateHandleResponse {
406
pub handle: Option<usize>,
407
}
408
409
/// Wrapper for tube which is used to delegate DuplicateHandle function calls to
410
/// the broker process.
411
#[derive(Serialize, Deserialize, Debug)]
412
pub struct DuplicateHandleTube(Tube);
413
414
impl DuplicateHandleTube {
415
pub fn new(tube: Tube) -> Self {
416
Self(tube)
417
}
418
419
pub fn request_duplicate_handle(
420
&self,
421
target_alias_pid: u32,
422
handle: RawHandle,
423
) -> Result<RawHandle> {
424
let req = DuplicateHandleRequest {
425
target_alias_pid,
426
handle: handle as usize,
427
};
428
self.0.send(&req)?;
429
let res: DuplicateHandleResponse = self.0.recv()?;
430
res.handle
431
.map(|h| h as RawHandle)
432
.ok_or(Error::BrokerDupDescriptor)
433
}
434
}
435
436
/// Wrapper for Tube used for sending and recving protos. The main usecase is to send a message
437
/// without serialization bloat caused from `serde-json`.
438
#[derive(Serialize, Deserialize)]
439
pub struct ProtoTube(Tube);
440
441
impl ProtoTube {
442
pub fn pair() -> Result<(ProtoTube, ProtoTube)> {
443
Tube::pair().map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
444
}
445
446
pub fn pair_with_buffer_size(size: usize) -> Result<(ProtoTube, ProtoTube)> {
447
Tube::pair_with_buffer_size(size).map(|(t1, t2)| (ProtoTube(t1), ProtoTube(t2)))
448
}
449
450
pub fn send_proto<M: protobuf::Message>(&self, msg: &M) -> Result<()> {
451
self.0.send_proto(msg)
452
}
453
454
pub fn recv_proto<M: protobuf::Message>(&self) -> Result<M> {
455
self.0.recv_proto()
456
}
457
}
458
459
impl ReadNotifier for ProtoTube {
460
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
461
self.0.get_read_notifier()
462
}
463
}
464
465
impl AsRawDescriptor for ProtoTube {
466
fn as_raw_descriptor(&self) -> RawDescriptor {
467
self.0.as_raw_descriptor()
468
}
469
}
470
471
/// A wrapper around a named pipe that uses Tube serialization.
472
///
473
/// This limited form of `Tube` offers absolutely no notifier support, and can only send/recv
474
/// blocking messages.
475
pub struct PipeTube {
476
pipe: PipeConnection,
477
478
// Default target_pid to current PID on serialization (see `Tube` comment header for details).
479
target_pid: Option<u32>,
480
}
481
482
impl PipeTube {
483
pub fn from(pipe: PipeConnection, target_pid: Option<u32>) -> Self {
484
Self { pipe, target_pid }
485
}
486
487
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
488
serialize_and_send(|buf| self.pipe.write(buf), msg, self.target_pid)
489
}
490
491
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
492
deserialize_and_recv(|buf| {
493
// SAFETY:
494
// 1. We are reading bytes, so no matter what data is on the pipe, it is representable
495
// as bytes.
496
// 2. A read is quantized in bytes, so no partial reads are possible.
497
unsafe { self.pipe.read(buf) }
498
})
499
}
500
}
501
502
/// Wrapper around a Tube which is known to be the server end of a named pipe. This wrapper ensures
503
/// that the Tube is flushed before it is dropped.
504
pub struct FlushOnDropTube(pub Tube);
505
506
impl FlushOnDropTube {
507
pub fn from(tube: Tube) -> Self {
508
Self(tube)
509
}
510
}
511
512
impl Drop for FlushOnDropTube {
513
fn drop(&mut self) {
514
if let Err(e) = self.0.flush_blocking() {
515
warn!("failed to flush Tube: {}", e)
516
}
517
}
518
}
519
520
impl Error {
521
fn map_io_error(e: io::Error, err_ctor: fn(io::Error) -> Error) -> Error {
522
if e.kind() == io::ErrorKind::UnexpectedEof || e.kind() == io::ErrorKind::BrokenPipe {
523
Error::Disconnected
524
} else {
525
err_ctor(e)
526
}
527
}
528
529
fn from_recv_io_error(e: io::Error) -> Error {
530
Self::map_io_error(e, Error::Recv)
531
}
532
533
fn from_send_error(e: io::Error) -> Error {
534
Self::map_io_error(e, Error::Send)
535
}
536
537
fn from_send_io_buf_error(e: io::Error) -> Error {
538
Self::map_io_error(e, Error::SendIoBuf)
539
}
540
}
541
542