Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/base/src/sys/windows/stream_channel.rs
5394 views
1
// Copyright 2022 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::io;
6
use std::sync::Arc;
7
8
use log::error;
9
use log::warn;
10
use serde::ser::SerializeStruct;
11
use serde::Deserialize;
12
use serde::Serialize;
13
use serde::Serializer;
14
use sync::Mutex;
15
16
use super::named_pipes;
17
use super::named_pipes::PipeConnection;
18
use super::MultiProcessMutex;
19
use super::RawDescriptor;
20
use super::Result;
21
use crate::descriptor::AsRawDescriptor;
22
use crate::CloseNotifier;
23
use crate::Event;
24
use crate::ReadNotifier;
25
26
#[derive(Copy, Clone)]
27
pub enum FramingMode {
28
Message,
29
Byte,
30
}
31
32
#[derive(Copy, Clone, PartialEq, Eq)]
33
pub enum BlockingMode {
34
Blocking,
35
Nonblocking,
36
}
37
38
impl From<FramingMode> for named_pipes::FramingMode {
39
fn from(framing_mode: FramingMode) -> Self {
40
match framing_mode {
41
FramingMode::Message => named_pipes::FramingMode::Message,
42
FramingMode::Byte => named_pipes::FramingMode::Byte,
43
}
44
}
45
}
46
47
impl From<BlockingMode> for named_pipes::BlockingMode {
48
fn from(blocking_mode: BlockingMode) -> Self {
49
match blocking_mode {
50
BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
51
BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
52
}
53
}
54
}
55
56
pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
57
58
/// An abstraction over named pipes and unix socketpairs.
59
///
60
/// WARNING: partial reads of messages behave differently depending on the platform.
61
/// See sys::unix::StreamChannel::inner_read for details.
62
///
63
/// The ReadNotifier will return an event handle that is set when data is in the channel.
64
///
65
/// In message mode, single writes larger than
66
/// `crate::windows::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
67
///
68
/// # Notes for maintainers
69
/// 1. This struct contains extremely subtle thread safety considerations.
70
/// 2. Serialization is not derived! New fields need to be added manually.
71
#[derive(Deserialize, Debug)]
72
pub struct StreamChannel {
73
pipe_conn: named_pipes::PipeConnection,
74
write_notify: Event,
75
read_notify: Event,
76
pipe_closed: Event,
77
78
// Held when reading on this end, to prevent additional writes from corrupting notification
79
// state.
80
remote_write_lock: MultiProcessMutex,
81
82
// Held when a write is made on this end, so that if the remote end is reading, we wait to
83
// write to avoid corrupting notification state.
84
local_write_lock: MultiProcessMutex,
85
86
// Held for the entire duration of a read. This enables the StreamChannel to be sync,
87
// ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
88
//
89
// In practice, there is no use-case for multiple threads actually contending over
90
// reading from a single pipe through StreamChannel, so this is mostly to provide a
91
// compiler guarantee while passing the StreamChannel to/from background executors.
92
//
93
// Note that this mutex does not work across processes, so the same StreamChannel end should
94
// NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
95
// not what you want. Wanting this means you want two readers on the *same end* of the pipe,
96
// which is not well defined behavior.)
97
#[serde(skip)]
98
#[serde(default = "create_read_lock")]
99
read_lock: Arc<Mutex<()>>,
100
101
// Serde only has an immutable reference. Because of that, we have to cheat to signal when this
102
// channel end has been serialized. Once serialized, we know that the current end MUST NOT
103
// signal the channel has been closed when it was dropped, because a copy of it was sent to
104
// another process. It is the copy's responsibility to close the pipe.
105
#[serde(skip)]
106
#[serde(default = "create_true_mutex")]
107
is_channel_closed_on_drop: Mutex<bool>,
108
109
// For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
110
// up to that size.
111
send_buffer_size: usize,
112
}
113
114
fn create_read_lock() -> Arc<Mutex<()>> {
115
Arc::new(Mutex::new(()))
116
}
117
118
fn create_true_mutex() -> Mutex<bool> {
119
Mutex::new(true)
120
}
121
122
/// Serialize is manually implemented because we need to tell the local copy that a remote copy
123
/// exists, and to not send the close event. Our serialization is otherwise identical to what
124
/// derive would have generated.
125
impl Serialize for StreamChannel {
126
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
127
where
128
S: Serializer,
129
{
130
let mut s = serializer.serialize_struct("StreamChannel", 7)?;
131
s.serialize_field("pipe_conn", &self.pipe_conn)?;
132
s.serialize_field("write_notify", &self.write_notify)?;
133
s.serialize_field("read_notify", &self.read_notify)?;
134
s.serialize_field("pipe_closed", &self.pipe_closed)?;
135
s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
136
s.serialize_field("local_write_lock", &self.local_write_lock)?;
137
s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
138
let ret = s.end();
139
140
// Because this end has been serialized, the serialized copy is now responsible for setting
141
// the close event.
142
if ret.is_ok() {
143
*self.is_channel_closed_on_drop.lock() = false;
144
}
145
146
ret
147
}
148
}
149
150
impl Drop for StreamChannel {
151
fn drop(&mut self) {
152
if *self.is_channel_closed_on_drop.lock() {
153
if let Err(e) = self.pipe_closed.signal() {
154
warn!("failed to notify on channel drop: {}", e);
155
}
156
}
157
}
158
}
159
160
impl StreamChannel {
161
pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
162
// Safe because the pipe is open.
163
if nonblocking {
164
self.pipe_conn
165
.set_blocking(&named_pipes::BlockingMode::NoWait)
166
} else {
167
self.pipe_conn
168
.set_blocking(&named_pipes::BlockingMode::Wait)
169
}
170
}
171
172
// WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
173
// > 1 reader per end is not defined.
174
pub fn try_clone(&self) -> io::Result<Self> {
175
Ok(StreamChannel {
176
pipe_conn: self.pipe_conn.try_clone()?,
177
write_notify: self.write_notify.try_clone()?,
178
read_notify: self.read_notify.try_clone()?,
179
pipe_closed: self.pipe_closed.try_clone()?,
180
remote_write_lock: self.remote_write_lock.try_clone()?,
181
local_write_lock: self.local_write_lock.try_clone()?,
182
read_lock: self.read_lock.clone(),
183
is_channel_closed_on_drop: create_true_mutex(),
184
send_buffer_size: self.send_buffer_size,
185
})
186
}
187
188
/// Gets the readable byte count. Returns zero for broken pipes since that will cause the read
189
/// notifier to be set, and for the consumer to quickly discover the broken pipe.
190
fn get_readable_byte_count(&self) -> io::Result<u32> {
191
match self.pipe_conn.get_available_byte_count() {
192
Err(e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
193
Err(e) => {
194
error!("StreamChannel failed to get readable byte count: {}", e);
195
Err(e)
196
}
197
Ok(byte_count) => Ok(byte_count),
198
}
199
}
200
201
pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
202
// We ensure concurrent read safety by holding a lock for the duration of the method.
203
// (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
204
// that the notifier should be set, leading to an invalid notified/readable state which
205
// could stall readers.)
206
let _read_lock = self.read_lock.lock();
207
208
// SAFETY:
209
// Safe because no partial reads are possible, and the underlying code bounds the
210
// read by buf's size.
211
let res = unsafe { self.pipe_conn.read(buf) };
212
213
// The entire goal of this complex section is to avoid the need for shared memory between
214
// each channel end to synchronize the notification state. It is very subtle, modify with
215
// care.
216
loop {
217
// No other thread is reading, so we can find out, without the write lock, whether or
218
// not we need to clear the read notifier. If we don't, then we don't even have to try
219
// acquiring the write lock. This avoids deadlocks where the pipe is full and the write
220
// side blocks on a writing with the lock held. If it looks like we do need to clear
221
// the notifier though, then we have to be sure, so we'll proceed to the next section.
222
let byte_count = self.get_readable_byte_count()?;
223
if byte_count > 0 {
224
// It's always safe to set the read notifier here because we know there is data in
225
// the pipe, and no one else could read it out from under us.
226
self.read_notify.signal().map_err(|e| {
227
io::Error::other(format!("failed to write to read notifier: {e:?}"))
228
})?;
229
230
// Notifier state has been safely synced.
231
return res;
232
}
233
234
// At this point, there *may* be no data in the pipe, meaning we may want to clear the
235
// notifier. Instead of just trying to acquire the lock outright which could deadlock
236
// with the writing side, we'll try with a timeout. If it fails, we know that the other
237
// side is in the middle of a write, so there either will be data in the pipe soon (a),
238
// or there won't be and we have to clear a spurious notification (b).
239
//
240
// For (a), we can safely return from the read without needing the lock, so we just come
241
// around in the loop to check again, and the loop will exit quickly.
242
//
243
// For (b) we'll return to this point and acquire the lock, as we're just waiting for
244
// the spurious notification to arrive so we can clear it (that code path is very fast),
245
// and the loop will exit.
246
//
247
// If we successfully acquire the lock though, then we can go ahead and clear the
248
// notifier if the pipe is indeed empty, because we are assured that no writes are
249
// happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
250
// that's a decent balance between avoiding an unnecessary iteration, and minimizing
251
// latency.
252
if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
253
let byte_count = self.get_readable_byte_count()?;
254
if byte_count > 0 {
255
// Safe because no one else can be reading from the pipe.
256
self.read_notify.signal().map_err(|e| {
257
io::Error::other(format!("failed to write to read notifier: {e:?}"))
258
})?;
259
} else {
260
// Safe because no other writes can be happening (_lock is held).
261
self.read_notify.reset().map_err(|e| {
262
io::Error::other(format!("failed to reset read notifier: {e:?}"))
263
})?;
264
}
265
266
// Notifier state has been safely synced.
267
return res;
268
}
269
}
270
}
271
272
/// Exists as a workaround for Tube which does not expect its transport to be mutable,
273
/// even though io::Write requires it.
274
pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
275
if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
276
&& buf.len() > self.send_buffer_size
277
{
278
return Err(io::Error::other(format!(
279
"StreamChannel forbids message mode writes larger than the \
280
default buffer size of {}.",
281
self.send_buffer_size,
282
)));
283
}
284
285
let _lock = self.local_write_lock.lock();
286
let res = self.pipe_conn.write(buf);
287
288
// We can always set the write notifier because we know that the reader is in one of the
289
// following states:
290
// 1) a read is running, and it consumes these bytes, so the notification is
291
// unnecessary. That's fine, because the reader will resync the notifier state once
292
// it finishes reading.
293
// 2) a read has completed and is blocked on the lock. The notification state is
294
// already correct, and the read's resync won't change that.
295
if res.is_ok() {
296
self.write_notify.signal().map_err(|e| {
297
io::Error::other(format!("failed to write to read notifier: {e:?}"))
298
})?;
299
}
300
301
res
302
}
303
304
/// This only works with empty pipes. U.B. will result if used in any other scenario.
305
pub fn from_pipes(
306
pipe_a: PipeConnection,
307
pipe_b: PipeConnection,
308
send_buffer_size: usize,
309
) -> Result<(StreamChannel, StreamChannel)> {
310
let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
311
let pipe_closed = Event::new()?;
312
313
let write_lock_a = MultiProcessMutex::new()?;
314
let write_lock_b = MultiProcessMutex::new()?;
315
316
let sock_a = StreamChannel {
317
pipe_conn: pipe_a,
318
write_notify: notify_a_write.try_clone()?,
319
read_notify: notify_b_write.try_clone()?,
320
read_lock: Arc::new(Mutex::new(())),
321
local_write_lock: write_lock_a.try_clone()?,
322
remote_write_lock: write_lock_b.try_clone()?,
323
pipe_closed: pipe_closed.try_clone()?,
324
is_channel_closed_on_drop: create_true_mutex(),
325
send_buffer_size,
326
};
327
let sock_b = StreamChannel {
328
pipe_conn: pipe_b,
329
write_notify: notify_b_write,
330
read_notify: notify_a_write,
331
read_lock: Arc::new(Mutex::new(())),
332
local_write_lock: write_lock_b,
333
remote_write_lock: write_lock_a,
334
pipe_closed,
335
is_channel_closed_on_drop: create_true_mutex(),
336
send_buffer_size,
337
};
338
Ok((sock_a, sock_b))
339
}
340
341
/// Create a pair with a specific buffer size. Note that this is the only way to send messages
342
/// larger than the default named pipe buffer size.
343
pub fn pair_with_buffer_size(
344
blocking_mode: BlockingMode,
345
framing_mode: FramingMode,
346
buffer_size: usize,
347
) -> Result<(StreamChannel, StreamChannel)> {
348
let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
349
&named_pipes::FramingMode::from(framing_mode),
350
&named_pipes::BlockingMode::from(blocking_mode),
351
0,
352
buffer_size,
353
false,
354
)?;
355
Self::from_pipes(pipe_a, pipe_b, buffer_size)
356
}
357
/// Creates a cross platform channel pair.
358
/// On Windows the result is in the form (server, client).
359
pub fn pair(
360
blocking_mode: BlockingMode,
361
framing_mode: FramingMode,
362
) -> Result<(StreamChannel, StreamChannel)> {
363
let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
364
&named_pipes::FramingMode::from(framing_mode),
365
&named_pipes::BlockingMode::from(blocking_mode),
366
0,
367
DEFAULT_BUFFER_SIZE,
368
false,
369
)?;
370
Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
371
}
372
373
/// Blocks until the pipe buffer is empty.
374
/// NOTE: that this will only work for server pipes on Windows.
375
pub fn flush_blocking(&self) -> io::Result<()> {
376
self.pipe_conn.flush_data_blocking()
377
}
378
379
pub(crate) fn get_read_notifier_event(&self) -> &Event {
380
&self.read_notify
381
}
382
383
pub(crate) fn get_close_notifier_event(&self) -> &Event {
384
&self.pipe_closed
385
}
386
}
387
388
impl io::Write for StreamChannel {
389
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
390
self.write_immutable(buf)
391
}
392
fn flush(&mut self) -> io::Result<()> {
393
// There is no userspace buffering inside crosvm to flush for named pipes. We write
394
// directly to the named pipe using WriteFile.
395
Ok(())
396
}
397
}
398
399
impl AsRawDescriptor for &StreamChannel {
400
fn as_raw_descriptor(&self) -> RawDescriptor {
401
self.pipe_conn.as_raw_descriptor()
402
}
403
}
404
405
impl ReadNotifier for StreamChannel {
406
/// Returns a RawDescriptor that can be polled for reads using WaitContext.
407
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
408
&self.read_notify
409
}
410
}
411
412
impl CloseNotifier for StreamChannel {
413
fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
414
&self.pipe_closed
415
}
416
}
417
418
impl io::Read for StreamChannel {
419
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
420
self.inner_read(buf)
421
}
422
}
423
424
impl io::Read for &StreamChannel {
425
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
426
self.inner_read(buf)
427
}
428
}
429
430
impl AsRawDescriptor for StreamChannel {
431
fn as_raw_descriptor(&self) -> RawDescriptor {
432
(&self).as_raw_descriptor()
433
}
434
}
435
436
#[cfg(test)]
437
mod test {
438
use std::io::Read;
439
use std::io::Write;
440
use std::time::Duration;
441
442
use super::super::EventContext;
443
use super::super::EventTrigger;
444
use super::*;
445
use crate::EventToken;
446
use crate::ReadNotifier;
447
448
#[derive(EventToken, Debug, Eq, PartialEq, Copy, Clone)]
449
enum Token {
450
ReceivedData,
451
}
452
453
const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
454
455
#[test]
456
fn test_read_notifies_multiple_writes() {
457
let (mut sender, mut receiver) =
458
StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
459
sender.write_all(&[1, 2]).unwrap();
460
461
// Wait for the write to arrive.
462
let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
463
receiver.get_read_notifier(),
464
Token::ReceivedData,
465
)])
466
.unwrap();
467
assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
468
469
// Read just one byte. This leaves another byte in the pipe.
470
let mut recv_buffer = [0u8; 1];
471
let size = receiver.read(&mut recv_buffer).unwrap();
472
assert_eq!(size, 1);
473
assert_eq!(recv_buffer[0], 1);
474
475
// The notifier should still be set, because the pipe has data.
476
assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
477
let size = receiver.read(&mut recv_buffer).unwrap();
478
assert_eq!(size, 1);
479
assert_eq!(recv_buffer[0], 2);
480
}
481
482
#[test]
483
fn test_blocked_writer_wont_deadlock() {
484
let (mut writer, mut reader) =
485
StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
486
.unwrap();
487
const NUM_OPS: usize = 100;
488
489
// We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
490
// 100x before we run into a blocking write, so that's what we do here. This makes sense
491
// to a degree because the docs suggest that some automatic expansion of a pipe's buffers
492
// is supposed to be handled by the kernel.
493
let writer = std::thread::spawn(move || {
494
let buf = [0u8; 100];
495
for _ in 0..NUM_OPS {
496
assert_eq!(writer.write(&buf).unwrap(), buf.len());
497
}
498
writer
499
});
500
501
// The test passes if the reader can read (this used to deadlock).
502
let mut buf = [0u8; 100];
503
for _ in 0..NUM_OPS {
504
assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
505
}
506
507
// Writer must exit cleanly.
508
writer.join().unwrap();
509
}
510
511
#[test]
512
fn test_non_blocking_pair() {
513
let (mut sender, mut receiver) =
514
StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
515
516
sender.write_all(&[75, 77, 54, 82, 76, 65]).unwrap();
517
518
// Wait for the data to arrive.
519
let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
520
receiver.get_read_notifier(),
521
Token::ReceivedData,
522
)])
523
.unwrap();
524
let events = event_ctx.wait().unwrap();
525
let tokens: Vec<Token> = events
526
.iter()
527
.filter(|e| e.is_readable)
528
.map(|e| e.token)
529
.collect();
530
assert_eq!(tokens, vec! {Token::ReceivedData});
531
532
// Smaller than what we sent so we get multiple chunks
533
let mut recv_buffer: [u8; 4] = [0; 4];
534
535
let mut size = receiver.read(&mut recv_buffer).unwrap();
536
assert_eq!(size, 4);
537
assert_eq!(recv_buffer, [75, 77, 54, 82]);
538
539
size = receiver.read(&mut recv_buffer).unwrap();
540
assert_eq!(size, 2);
541
assert_eq!(recv_buffer[0..2], [76, 65]);
542
543
// Now that we've polled for & received all data, polling again should show no events.
544
assert_eq!(
545
event_ctx
546
.wait_timeout(std::time::Duration::new(0, 0))
547
.unwrap()
548
.len(),
549
0
550
);
551
}
552
553
#[test]
554
fn test_non_blocking_pair_error_no_data() {
555
let (mut sender, mut receiver) =
556
StreamChannel::pair(BlockingMode::Nonblocking, FramingMode::Byte).unwrap();
557
receiver
558
.set_nonblocking(true)
559
.expect("Failed to set receiver to nonblocking mode.");
560
561
sender.write_all(&[75, 77]).unwrap();
562
563
// Wait for the data to arrive.
564
let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
565
receiver.get_read_notifier(),
566
Token::ReceivedData,
567
)])
568
.unwrap();
569
let events = event_ctx.wait().unwrap();
570
let tokens: Vec<Token> = events
571
.iter()
572
.filter(|e| e.is_readable)
573
.map(|e| e.token)
574
.collect();
575
assert_eq!(tokens, vec! {Token::ReceivedData});
576
577
// We only read 2 bytes, even though we requested 4 bytes.
578
let mut recv_buffer: [u8; 4] = [0; 4];
579
let size = receiver.read(&mut recv_buffer).unwrap();
580
assert_eq!(size, 2);
581
assert_eq!(recv_buffer, [75, 77, 00, 00]);
582
583
// Further reads should encounter an error since there is no available data and this is a
584
// non blocking pipe.
585
assert!(receiver.read(&mut recv_buffer).is_err());
586
}
587
}
588
589