Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/base/src/sys/windows/named_pipes.rs
5394 views
1
// Copyright 2022 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::ffi::CString;
6
use std::fs::OpenOptions;
7
use std::io;
8
use std::io::Result;
9
use std::mem;
10
use std::os::windows::fs::OpenOptionsExt;
11
use std::process;
12
use std::ptr;
13
use std::sync::atomic::AtomicBool;
14
use std::sync::atomic::AtomicUsize;
15
use std::sync::atomic::Ordering;
16
use std::sync::Arc;
17
18
use serde::Deserialize;
19
use serde::Serialize;
20
use sync::Mutex;
21
use win_util::fail_if_zero;
22
use win_util::SecurityAttributes;
23
use win_util::SelfRelativeSecurityDescriptor;
24
use winapi::shared::minwindef::DWORD;
25
use winapi::shared::minwindef::FALSE;
26
use winapi::shared::minwindef::TRUE;
27
use winapi::shared::winerror::ERROR_BROKEN_PIPE;
28
use winapi::shared::winerror::ERROR_IO_INCOMPLETE;
29
use winapi::shared::winerror::ERROR_IO_PENDING;
30
use winapi::shared::winerror::ERROR_MORE_DATA;
31
use winapi::shared::winerror::ERROR_NO_DATA;
32
use winapi::shared::winerror::ERROR_PIPE_CONNECTED;
33
use winapi::um::errhandlingapi::GetLastError;
34
use winapi::um::fileapi::FlushFileBuffers;
35
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
36
use winapi::um::ioapiset::CancelIoEx;
37
use winapi::um::ioapiset::GetOverlappedResult;
38
use winapi::um::minwinbase::OVERLAPPED;
39
use winapi::um::namedpipeapi::ConnectNamedPipe;
40
use winapi::um::namedpipeapi::DisconnectNamedPipe;
41
use winapi::um::namedpipeapi::GetNamedPipeInfo;
42
use winapi::um::namedpipeapi::PeekNamedPipe;
43
use winapi::um::namedpipeapi::SetNamedPipeHandleState;
44
use winapi::um::winbase::CreateNamedPipeA;
45
use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
46
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
47
use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
48
use winapi::um::winbase::PIPE_NOWAIT;
49
use winapi::um::winbase::PIPE_READMODE_BYTE;
50
use winapi::um::winbase::PIPE_READMODE_MESSAGE;
51
use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;
52
use winapi::um::winbase::PIPE_TYPE_BYTE;
53
use winapi::um::winbase::PIPE_TYPE_MESSAGE;
54
use winapi::um::winbase::PIPE_WAIT;
55
use winapi::um::winbase::SECURITY_IDENTIFICATION;
56
57
use super::RawDescriptor;
58
use crate::descriptor::AsRawDescriptor;
59
use crate::descriptor::FromRawDescriptor;
60
use crate::descriptor::IntoRawDescriptor;
61
use crate::descriptor::SafeDescriptor;
62
use crate::Event;
63
use crate::EventToken;
64
use crate::WaitContext;
65
66
/// The default buffer size for all named pipes in the system. If this size is too small, writers
67
/// on named pipes that expect not to block *can* block until the reading side empties the buffer.
68
///
69
/// The general rule is this should be *at least* as big as the largest message, otherwise
70
/// unexpected blocking behavior can result; for example, if too small, this can interact badly with
71
/// crate::windows::StreamChannel, which expects to be able to make a complete write before
72
/// releasing a lock that the opposite side needs to complete a read. This means that if the buffer
73
/// is too small:
74
/// * The writer can't complete its write and release the lock because the buffer is too small.
75
/// * The reader can't start reading because the lock is held by the writer, so it can't relieve
76
/// buffer pressure. Note that for message pipes, the reader couldn't do anything to help
77
/// anyway, because a message mode pipe should NOT have a partial read (which is what we would
78
/// need to relieve pressure).
79
/// * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
80
pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
81
82
static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
83
84
#[remain::sorted]
85
#[derive(Debug, thiserror::Error)]
86
pub enum PipeError {
87
#[error("read zero bytes, but this is not an EOF")]
88
ZeroByteReadNoEof,
89
}
90
91
/// Represents one end of a named pipe
92
///
93
/// NOTE: implementations of Read & Write are trait complaint for EOF/broken pipe handling
94
/// (returning a successful zero byte read), but overlapped read/write versions are NOT (they will
95
/// return broken pipe directly due to API limitations; see PipeConnection::read for
96
/// details).
97
#[derive(Serialize, Deserialize, Debug)]
98
pub struct PipeConnection {
99
handle: SafeDescriptor,
100
framing_mode: FramingMode,
101
blocking_mode: BlockingMode,
102
}
103
104
/// `OVERLAPPED` is allocated on the heap because it must not move while performing I/O operations.
105
///
106
/// Defined as a separate type so that we can mark it as `Send` and `Sync`.
107
pub struct BoxedOverlapped(pub Box<OVERLAPPED>);
108
109
// SAFETY: `OVERLAPPED` is not automatically `Send` because it contains a `HANDLE`, which is a raw
110
// pointer, but `HANDLE`s are safe to move between threads and thus so is `OVERLAPPED`.
111
unsafe impl Send for BoxedOverlapped {}
112
113
// SAFETY: See the argument for `Send` above. `HANDLE`s are also safe to share between threads.
114
unsafe impl Sync for BoxedOverlapped {}
115
116
/// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
117
/// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
118
pub struct OverlappedWrapper {
119
overlapped: BoxedOverlapped,
120
// This field prevents the event handle from being dropped too early and allows callers to
121
// be notified when a read or write overlapped operation has completed.
122
h_event: Option<Event>,
123
in_use: bool,
124
}
125
126
impl OverlappedWrapper {
127
pub fn get_h_event_ref(&self) -> Option<&Event> {
128
self.h_event.as_ref()
129
}
130
131
/// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
132
/// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
133
/// returned must not be dropped.
134
///
135
/// There is an option to create the event object and set it to the `hEvent` field. If hEvent
136
/// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
137
/// handle will be signaled when the operation is complete. In other words, you can use
138
/// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
139
/// Microsoft though.
140
pub fn new(include_event: bool) -> Result<OverlappedWrapper> {
141
let mut overlapped = OVERLAPPED::default();
142
let h_event = if include_event {
143
Some(Event::new()?)
144
} else {
145
None
146
};
147
148
overlapped.hEvent = if let Some(event) = h_event.as_ref() {
149
event.as_raw_descriptor()
150
} else {
151
0 as RawDescriptor
152
};
153
154
Ok(OverlappedWrapper {
155
overlapped: BoxedOverlapped(Box::new(overlapped)),
156
h_event,
157
in_use: false,
158
})
159
}
160
}
161
162
pub trait WriteOverlapped {
163
/// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
164
/// If successful, the write operation will complete asynchronously, and
165
/// `write_result()` should be called to get the result.
166
///
167
/// # Safety
168
/// `buf` and `overlapped_wrapper` will be in use for the duration of
169
/// the overlapped operation. These must not be reused and must live until
170
/// after `write_result()` has been called.
171
unsafe fn write_overlapped(
172
&mut self,
173
buf: &mut [u8],
174
overlapped_wrapper: &mut OverlappedWrapper,
175
) -> io::Result<()>;
176
177
/// Gets the result of the overlapped write operation. Must only be called
178
/// after issuing an overlapped write operation using `write_overlapped`. The
179
/// same `overlapped_wrapper` must be provided.
180
fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
181
182
/// Tries to get the result of the overlapped write operation. Must only be
183
/// called once, and only after issuing an overlapped write operation using
184
/// `write_overlapped`. The same `overlapped_wrapper` must be provided.
185
///
186
/// An error indicates that the operation hasn't completed yet and
187
/// `write_result` or `try_write_result` should be called again.
188
fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)
189
-> io::Result<usize>;
190
}
191
192
pub trait ReadOverlapped {
193
/// Perform an overlapped read operation with the specified buffer and overlapped wrapper.
194
/// If successful, the read operation will complete asynchronously, and
195
/// `read_result()` should be called to get the result.
196
///
197
/// # Safety
198
/// `buf` and `overlapped_wrapper` will be in use for the duration of
199
/// the overlapped operation. These must not be reused and must live until
200
/// after `read_result()` has been called.
201
unsafe fn read_overlapped(
202
&mut self,
203
buf: &mut [u8],
204
overlapped_wrapper: &mut OverlappedWrapper,
205
) -> io::Result<()>;
206
207
/// Gets the result of the overlapped read operation. Must only be called
208
/// once, and only after issuing an overlapped read operation using
209
/// `read_overlapped`. The same `overlapped_wrapper` must be provided.
210
fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
211
212
/// Tries to get the result of the overlapped read operation. Must only be called
213
/// after issuing an overlapped read operation using `read_overlapped`. The
214
/// same `overlapped_wrapper` must be provided.
215
///
216
/// An error indicates that the operation hasn't completed yet and
217
/// `read_result` or `try_read_result` should be called again.
218
fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
219
}
220
221
#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
222
pub enum FramingMode {
223
Byte,
224
Message,
225
}
226
227
impl FramingMode {
228
fn to_readmode(self) -> DWORD {
229
match self {
230
FramingMode::Message => PIPE_READMODE_MESSAGE,
231
FramingMode::Byte => PIPE_READMODE_BYTE,
232
}
233
}
234
235
fn to_pipetype(self) -> DWORD {
236
match self {
237
FramingMode::Message => PIPE_TYPE_MESSAGE,
238
FramingMode::Byte => PIPE_TYPE_BYTE,
239
}
240
}
241
}
242
243
#[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]
244
pub enum BlockingMode {
245
/// Calls to read() block until data is received
246
Wait,
247
/// Calls to read() return immediately even if there is nothing read with error code 232
248
/// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
249
///
250
/// NOTE: This mode is discouraged by the Windows API documentation.
251
NoWait,
252
}
253
254
impl From<&BlockingMode> for DWORD {
255
fn from(blocking_mode: &BlockingMode) -> DWORD {
256
match blocking_mode {
257
BlockingMode::Wait => PIPE_WAIT,
258
BlockingMode::NoWait => PIPE_NOWAIT,
259
}
260
}
261
}
262
263
/// Sets the handle state for a named pipe in a rust friendly way.
264
/// SAFETY:
265
/// This is safe if the pipe handle is open.
266
unsafe fn set_named_pipe_handle_state(
267
pipe_handle: RawDescriptor,
268
client_mode: &mut DWORD,
269
) -> Result<()> {
270
// Safe when the pipe handle is open. Safety also requires checking the return value, which we
271
// do below.
272
let success_flag = SetNamedPipeHandleState(
273
/* hNamedPipe= */ pipe_handle,
274
/* lpMode= */ client_mode,
275
/* lpMaxCollectionCount= */ ptr::null_mut(),
276
/* lpCollectDataTimeout= */ ptr::null_mut(),
277
);
278
if success_flag == 0 {
279
Err(io::Error::last_os_error())
280
} else {
281
Ok(())
282
}
283
}
284
285
pub fn pair(
286
framing_mode: &FramingMode,
287
blocking_mode: &BlockingMode,
288
timeout: u64,
289
) -> Result<(PipeConnection, PipeConnection)> {
290
pair_with_buffer_size(
291
framing_mode,
292
blocking_mode,
293
timeout,
294
DEFAULT_BUFFER_SIZE,
295
false,
296
)
297
}
298
299
/// Creates a pair of handles connected to either end of a duplex named pipe.
300
///
301
/// The pipe created will have a semi-random name and a default set of security options that
302
/// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
303
/// remote clients, allow only a single server instance, and prevent impersonation by the server
304
/// end of the pipe.
305
///
306
/// # Arguments
307
///
308
/// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
309
/// automatically framed sequence of messages (Message). In message mode it's an error to read
310
/// fewer bytes than were sent in a message from the other end of the pipe.
311
/// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
312
/// return immediately if there is nothing available (NoWait).
313
/// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to
314
/// zero will create sockets with the system default timeout.
315
/// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
316
/// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
317
/// writes that don't fit in the buffer.
318
/// # Return value
319
///
320
/// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
321
/// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
322
pub fn pair_with_buffer_size(
323
framing_mode: &FramingMode,
324
blocking_mode: &BlockingMode,
325
timeout: u64,
326
buffer_size: usize,
327
overlapped: bool,
328
) -> Result<(PipeConnection, PipeConnection)> {
329
// Give the pipe a unique name to avoid accidental collisions
330
let pipe_name = format!(
331
r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
332
process::id(),
333
NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
334
rand::random::<u32>(),
335
);
336
337
let server_end = create_server_pipe(
338
&pipe_name,
339
framing_mode,
340
blocking_mode,
341
timeout,
342
buffer_size,
343
overlapped,
344
)?;
345
346
// Open the named pipe we just created as the client
347
let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
348
349
// Accept the client's connection
350
// Not sure if this is strictly needed but I'm doing it just in case.
351
// We expect at this point that the client will already be connected,
352
// so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
353
// It's also OK if we get a return code of success.
354
server_end.wait_for_client_connection()?;
355
356
Ok((server_end, client_end))
357
}
358
359
/// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
360
/// settings.
361
///
362
/// The pipe will be set to reject remote clients and allow only a single connection at a time.
363
///
364
/// # Arguments
365
///
366
/// * `pipe_name` - The path of the named pipe to create. Should be in the form
367
/// `\\.\pipe\<some-name>`.
368
/// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
369
/// automatically framed sequence of messages (Message). In message mode it's an error to read
370
/// fewer bytes than were sent in a message from the other end of the pipe.
371
/// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
372
/// return immediately if there is nothing available (NoWait).
373
/// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to
374
/// zero will create sockets with the system default timeout.
375
/// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
376
/// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
377
/// writes that don't fit in the buffer.
378
/// * `overlapped` - Sets whether overlapped mode is set on the pipe.
379
pub fn create_server_pipe(
380
pipe_name: &str,
381
framing_mode: &FramingMode,
382
blocking_mode: &BlockingMode,
383
timeout: u64,
384
buffer_size: usize,
385
overlapped: bool,
386
) -> Result<PipeConnection> {
387
let c_pipe_name = CString::new(pipe_name).unwrap();
388
389
let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
390
if overlapped {
391
open_mode_flags |= FILE_FLAG_OVERLAPPED
392
}
393
394
// This sets flags so there will be an error if >1 instance (server end)
395
// of this pipe name is opened because we expect exactly one.
396
// SAFETY:
397
// Safe because security attributes are valid, pipe_name is valid C string,
398
// and we're checking the return code
399
let server_handle = unsafe {
400
CreateNamedPipeA(
401
c_pipe_name.as_ptr(),
402
/* dwOpenMode= */
403
open_mode_flags,
404
/* dwPipeMode= */
405
framing_mode.to_pipetype()
406
| framing_mode.to_readmode()
407
| DWORD::from(blocking_mode)
408
| PIPE_REJECT_REMOTE_CLIENTS,
409
/* nMaxInstances= */ 1,
410
/* nOutBufferSize= */ buffer_size as DWORD,
411
/* nInBufferSize= */ buffer_size as DWORD,
412
/* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
413
/* lpSecurityAttributes= */
414
SecurityAttributes::new_with_security_descriptor(
415
SelfRelativeSecurityDescriptor::get_singleton(),
416
/* inherit= */ true,
417
)
418
.as_mut(),
419
)
420
};
421
422
if server_handle == INVALID_HANDLE_VALUE {
423
Err(io::Error::last_os_error())
424
} else {
425
// SAFETY: Safe because server_handle is valid.
426
unsafe {
427
Ok(PipeConnection {
428
handle: SafeDescriptor::from_raw_descriptor(server_handle),
429
framing_mode: *framing_mode,
430
blocking_mode: *blocking_mode,
431
})
432
}
433
}
434
}
435
436
/// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
437
/// settings.
438
///
439
/// The pipe will be set to prevent impersonation of the client by the server process.
440
///
441
/// # Arguments
442
///
443
/// * `pipe_name` - The path of the named pipe to create. Should be in the form
444
/// `\\.\pipe\<some-name>`.
445
/// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
446
/// automatically framed sequence of messages (Message). In message mode it's an error to read
447
/// fewer bytes than were sent in a message from the other end of the pipe.
448
/// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
449
/// return immediately if there is nothing available (NoWait).
450
/// * `overlapped` - Sets whether the pipe is opened in overlapped mode.
451
pub fn create_client_pipe(
452
pipe_name: &str,
453
framing_mode: &FramingMode,
454
blocking_mode: &BlockingMode,
455
overlapped: bool,
456
) -> Result<PipeConnection> {
457
let client_handle = OpenOptions::new()
458
.read(true)
459
.write(true)
460
.create(true)
461
.security_qos_flags(SECURITY_IDENTIFICATION)
462
.custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
463
.open(pipe_name)?
464
.into_raw_descriptor();
465
466
let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
467
468
// SAFETY:
469
// Safe because client_handle's open() call did not return an error.
470
unsafe {
471
set_named_pipe_handle_state(client_handle, &mut client_mode)?;
472
}
473
474
Ok(PipeConnection {
475
// SAFETY:
476
// Safe because client_handle is valid
477
handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
478
framing_mode: *framing_mode,
479
blocking_mode: *blocking_mode,
480
})
481
}
482
483
// This is used to mark types which can be appropriately sent through the
484
// generic helper functions write_to_pipe and read_from_pipe.
485
pub trait PipeSendable {
486
// Default values used to fill in new empty indexes when resizing a buffer to
487
// a larger size.
488
fn default() -> Self;
489
}
490
impl PipeSendable for u8 {
491
fn default() -> Self {
492
0
493
}
494
}
495
impl PipeSendable for RawDescriptor {
496
fn default() -> Self {
497
ptr::null_mut()
498
}
499
}
500
501
impl PipeConnection {
502
pub fn try_clone(&self) -> Result<PipeConnection> {
503
let copy_handle = self.handle.try_clone()?;
504
Ok(PipeConnection {
505
handle: copy_handle,
506
framing_mode: self.framing_mode,
507
blocking_mode: self.blocking_mode,
508
})
509
}
510
511
/// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
512
/// blocking modes.
513
///
514
/// # Safety
515
/// 1. rd is valid and ownership is transferred to this function when it is called.
516
///
517
/// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
518
/// underlying pipe.
519
pub unsafe fn from_raw_descriptor(
520
rd: RawDescriptor,
521
framing_mode: FramingMode,
522
blocking_mode: BlockingMode,
523
) -> PipeConnection {
524
PipeConnection {
525
handle: SafeDescriptor::from_raw_descriptor(rd),
526
framing_mode,
527
blocking_mode,
528
}
529
}
530
531
/// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
532
/// Returns the number of bytes (not values) read.
533
///
534
/// # Safety
535
///
536
/// This is safe only when the following conditions hold:
537
/// 1. The data on the other end of the pipe is a valid binary representation of data for
538
/// type T, and
539
/// 2. The number of bytes read is a multiple of the size of T; this must be checked by
540
/// the caller.
541
/// If buf's type is file descriptors, this is only safe when those file descriptors are valid
542
/// for the process where this function was called.
543
pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
544
match PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None) {
545
// Windows allows for zero byte writes on one end of a pipe to be read by the other as
546
// zero byte reads. These zero byte reads DO NOT signify EOF, so from the perspective
547
// of std::io::Read, they cannot be reported as Ok(0). We translate them to errors.
548
//
549
// Within CrosVM, this behavior is not used, but it has been implemented to avoid UB
550
// either in the future, or when talking to non CrosVM named pipes. If we need to
551
// actually use/understand this error from other parts of KiwiVM (e.g. PipeConnection
552
// consumers), we could use ErrorKind::Interrupted (which as of 24/11/26 is not used by
553
// Rust for other purposes).
554
Ok(len) if len == 0 && !buf.is_empty() => {
555
Err(io::Error::other(PipeError::ZeroByteReadNoEof))
556
}
557
558
// Read at least 1 byte, or 0 bytes if a zero byte buffer was provided.
559
Ok(len) => Ok(len),
560
561
// Treat a closed pipe like an EOF, because that is consistent with the Read trait.
562
//
563
// NOTE: this is explicitly NOT done for overlapped operations for a few reasons:
564
// 1. Overlapped operations do not follow the Read trait, so there is no strong reason
565
// *to* do it.
566
// 2. Ok(0) also means "overlapped operation started successfully." This is a real
567
// problem because the general pattern is to start an overlapped operation and then
568
// wait for it. So if we did that and the Ok(0) meant the pipe is closed, we would
569
// enter an infinite wait. (The kernel already told us when we started the operation
570
// that the pipe was closed. It won't tell us again.)
571
Err(e) if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) => Ok(0),
572
573
Err(e) => Err(e),
574
}
575
}
576
577
/// Similar to `PipeConnection::read` except it also allows:
578
/// 1. The same end of the named pipe to read and write at the same time in different
579
/// threads.
580
/// 2. Asynchronous read and write (read and write won't block).
581
///
582
/// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
583
/// (can be created with `OverlappedWrapper::new`) will be passed into
584
/// `ReadFile`. That event will be triggered when the read operation is complete.
585
///
586
/// In order to get how many bytes were read, call `get_overlapped_result`. That function will
587
/// also help with waiting until the read operation is complete.
588
///
589
/// # Safety
590
///
591
/// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
592
/// overlapped mode otherwise there may be unexpected behavior.
593
pub unsafe fn read_overlapped<T: PipeSendable>(
594
&mut self,
595
buf: &mut [T],
596
overlapped_wrapper: &mut OverlappedWrapper,
597
) -> Result<()> {
598
if overlapped_wrapper.in_use {
599
return Err(std::io::Error::new(
600
std::io::ErrorKind::InvalidInput,
601
"Overlapped struct already in use",
602
));
603
}
604
overlapped_wrapper.in_use = true;
605
606
PipeConnection::read_internal(
607
&self.handle,
608
self.blocking_mode,
609
buf,
610
Some(&mut overlapped_wrapper.overlapped.0),
611
)?;
612
Ok(())
613
}
614
615
/// Helper for `read_overlapped` and `read`
616
///
617
/// # Safety
618
/// Comments `read_overlapped` or `read`, depending on which is used.
619
unsafe fn read_internal<T: PipeSendable>(
620
handle: &SafeDescriptor,
621
blocking_mode: BlockingMode,
622
buf: &mut [T],
623
overlapped: Option<&mut OVERLAPPED>,
624
) -> Result<usize> {
625
let res = crate::windows::read_file(
626
handle,
627
buf.as_mut_ptr() as *mut u8,
628
mem::size_of_val(buf),
629
overlapped,
630
);
631
match res {
632
Ok(bytes_read) => Ok(bytes_read),
633
// For message mode pipes, if the buffer is too small for the entire message, the kernel
634
// will return ERROR_MORE_DATA. This isn't strictly an "error" because the operation
635
// succeeds. Making it an error also means it's hard to handle this cleanly from the
636
// perspective of an io::Read consumer. So we discard the non-error, and return the
637
// successful result of filling the entire buffer.
638
Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => Ok(buf.len()),
639
Err(e)
640
if blocking_mode == BlockingMode::NoWait
641
&& e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>
642
{
643
// A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
644
// this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
645
// correct. For further details see:
646
// https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
647
// https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
648
Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
649
}
650
Err(e) => Err(e),
651
}
652
}
653
654
/// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted
655
/// by an event on `exit_event`.
656
pub fn read_overlapped_blocking<T: PipeSendable>(
657
&mut self,
658
buf: &mut [T],
659
overlapped_wrapper: &mut OverlappedWrapper,
660
exit_event: &Event,
661
) -> Result<()> {
662
// SAFETY:
663
// Safe because we are providing a valid buffer slice and also providing a valid
664
// overlapped struct.
665
match unsafe { self.read_overlapped(buf, overlapped_wrapper) } {
666
Err(e) => Err(e),
667
Ok(()) => Ok(()),
668
}?;
669
670
#[derive(EventToken)]
671
enum Token {
672
ReadOverlapped,
673
Exit,
674
}
675
676
let wait_ctx = WaitContext::build_with(&[
677
(
678
overlapped_wrapper.get_h_event_ref().unwrap(),
679
Token::ReadOverlapped,
680
),
681
(exit_event, Token::Exit),
682
])?;
683
684
let events = wait_ctx.wait()?;
685
for event in events {
686
match event.token {
687
Token::ReadOverlapped => {
688
let size_read_in_bytes =
689
self.get_overlapped_result(overlapped_wrapper)? as usize;
690
691
// If this error shows, most likely the overlapped named pipe was set up
692
// incorrectly.
693
if size_read_in_bytes != buf.len() {
694
return Err(std::io::Error::new(
695
std::io::ErrorKind::UnexpectedEof,
696
"Short read",
697
));
698
}
699
}
700
Token::Exit => {
701
return Err(std::io::Error::new(
702
std::io::ErrorKind::Interrupted,
703
"IO canceled on exit request",
704
));
705
}
706
}
707
}
708
709
Ok(())
710
}
711
712
/// Gets the size in bytes of data in the pipe.
713
///
714
/// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
715
/// not finished writing on the producer side.
716
pub fn get_available_byte_count(&self) -> io::Result<u32> {
717
let mut total_bytes_avail: DWORD = 0;
718
719
// SAFETY:
720
// Safe because the underlying pipe handle is guaranteed to be open, and the output values
721
// live at valid memory locations.
722
fail_if_zero!(unsafe {
723
PeekNamedPipe(
724
self.as_raw_descriptor(),
725
ptr::null_mut(),
726
0,
727
ptr::null_mut(),
728
&mut total_bytes_avail,
729
ptr::null_mut(),
730
)
731
});
732
733
Ok(total_bytes_avail)
734
}
735
736
/// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
737
/// callers should check to ensure that it was the number expected.
738
pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
739
// SAFETY: overlapped is None so this is safe.
740
unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
741
}
742
743
/// Similar to `PipeConnection::write` except it also allows:
744
/// 1. The same end of the named pipe to read and write at the same time in different
745
/// threads.
746
/// 2. Asynchronous read and write (read and write won't block).
747
///
748
/// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
749
/// (can be created with `OverlappedWrapper::new`) will be passed into
750
/// `WriteFile`. That event will be triggered when the write operation is complete.
751
///
752
/// In order to get how many bytes were written, call `get_overlapped_result`. That function
753
/// will also help with waiting until the write operation is complete. The pipe must be
754
/// opened in overlapped otherwise there may be unexpected behavior.
755
///
756
/// # Safety
757
/// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.
758
pub unsafe fn write_overlapped<T: PipeSendable>(
759
&mut self,
760
buf: &[T],
761
overlapped_wrapper: &mut OverlappedWrapper,
762
) -> Result<()> {
763
if overlapped_wrapper.in_use {
764
return Err(std::io::Error::new(
765
std::io::ErrorKind::InvalidInput,
766
"Overlapped struct already in use",
767
));
768
}
769
overlapped_wrapper.in_use = true;
770
771
PipeConnection::write_internal(
772
&self.handle,
773
buf,
774
Some(&mut overlapped_wrapper.overlapped.0),
775
)?;
776
Ok(())
777
}
778
779
/// Helper for `write_overlapped` and `write`.
780
///
781
/// # Safety
782
/// * Safe if overlapped is None.
783
/// * Safe if overlapped is Some and:
784
/// + buf lives until the overlapped operation is complete.
785
/// + overlapped lives until the overlapped operation is complete.
786
unsafe fn write_internal<T: PipeSendable>(
787
handle: &SafeDescriptor,
788
buf: &[T],
789
overlapped: Option<&mut OVERLAPPED>,
790
) -> Result<usize> {
791
// SAFETY:
792
// Safe because buf points to memory valid until the write completes and we pass a valid
793
// length for that memory.
794
unsafe {
795
crate::windows::write_file(
796
handle,
797
buf.as_ptr() as *const u8,
798
mem::size_of_val(buf),
799
overlapped,
800
)
801
}
802
}
803
804
/// Sets the blocking mode on the pipe.
805
pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
806
let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
807
self.blocking_mode = *blocking_mode;
808
809
// SAFETY:
810
// Safe because the pipe has not been closed (it is managed by this object).
811
unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
812
}
813
814
/// For a server named pipe, waits for a client to connect (blocking).
815
pub fn wait_for_client_connection(&self) -> Result<()> {
816
let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
817
self.wait_for_client_connection_internal(
818
&mut overlapped_wrapper,
819
/* should_block = */ true,
820
)
821
}
822
823
/// Interruptable blocking wait for a client to connect.
824
pub fn wait_for_client_connection_overlapped_blocking(
825
&mut self,
826
exit_event: &Event,
827
) -> Result<()> {
828
let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
829
self.wait_for_client_connection_internal(
830
&mut overlapped_wrapper,
831
/* should_block = */ false,
832
)?;
833
834
#[derive(EventToken)]
835
enum Token {
836
Connected,
837
Exit,
838
}
839
840
let wait_ctx = WaitContext::build_with(&[
841
(
842
overlapped_wrapper.get_h_event_ref().unwrap(),
843
Token::Connected,
844
),
845
(exit_event, Token::Exit),
846
])?;
847
848
let events = wait_ctx.wait()?;
849
if let Some(event) = events.into_iter().next() {
850
return match event.token {
851
Token::Connected => Ok(()),
852
Token::Exit => {
853
// We must cancel IO here because it is unsafe to free the overlapped wrapper
854
// while the IO operation is active.
855
self.cancel_io()?;
856
857
Err(std::io::Error::new(
858
std::io::ErrorKind::Interrupted,
859
"IO canceled on exit request",
860
))
861
}
862
};
863
}
864
unreachable!("wait cannot return Ok with zero events");
865
}
866
867
/// For a server named pipe, waits for a client to connect using the given overlapped wrapper
868
/// to signal connection.
869
pub fn wait_for_client_connection_overlapped(
870
&self,
871
overlapped_wrapper: &mut OverlappedWrapper,
872
) -> Result<()> {
873
self.wait_for_client_connection_internal(
874
overlapped_wrapper,
875
/* should_block = */ false,
876
)
877
}
878
879
fn wait_for_client_connection_internal(
880
&self,
881
overlapped_wrapper: &mut OverlappedWrapper,
882
should_block: bool,
883
) -> Result<()> {
884
// SAFETY:
885
// Safe because the handle is valid and we're checking the return
886
// code according to the documentation
887
//
888
// TODO(b/279669296) this safety statement is incomplete, and as such incorrect in one case:
889
// overlapped_wrapper must live until the overlapped operation is complete; however,
890
// if should_block is false, nothing guarantees that lifetime and so overlapped_wrapper
891
// could be freed while the operation is still running.
892
unsafe {
893
let success_flag = ConnectNamedPipe(
894
self.as_raw_descriptor(),
895
// Note: The overlapped structure is only used if the pipe was opened in
896
// OVERLAPPED mode, but is necessary in that case.
897
&mut *overlapped_wrapper.overlapped.0,
898
);
899
if success_flag == 0 {
900
return match GetLastError() {
901
ERROR_PIPE_CONNECTED => {
902
if !should_block {
903
// If async, make sure the event is signalled to indicate the client
904
// is ready.
905
overlapped_wrapper.get_h_event_ref().unwrap().signal()?;
906
}
907
908
Ok(())
909
}
910
ERROR_IO_PENDING => {
911
if should_block {
912
overlapped_wrapper.get_h_event_ref().unwrap().wait()?;
913
}
914
Ok(())
915
}
916
err => Err(io::Error::from_raw_os_error(err as i32)),
917
};
918
}
919
}
920
Ok(())
921
}
922
923
/// Used for overlapped read and write operations.
924
///
925
/// This will block until the ReadFile or WriteFile operation that also took in
926
/// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
927
/// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get
928
/// the number of bytes that were read or written.
929
pub fn get_overlapped_result(
930
&mut self,
931
overlapped_wrapper: &mut OverlappedWrapper,
932
) -> io::Result<u32> {
933
let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);
934
overlapped_wrapper.in_use = false;
935
res
936
}
937
938
/// Used for overlapped read and write operations.
939
///
940
/// This will return immediately, regardless of the completion status of the
941
/// ReadFile or WriteFile operation that took in `overlapped_wrapper`,
942
/// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`
943
/// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes
944
/// that were read or written, if completed. If the operation hasn't
945
/// completed, an error of kind `io::ErrorKind::WouldBlock` will be
946
/// returned.
947
pub fn try_get_overlapped_result(
948
&mut self,
949
overlapped_wrapper: &mut OverlappedWrapper,
950
) -> io::Result<u32> {
951
let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);
952
match res {
953
Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {
954
Err(io::Error::new(io::ErrorKind::WouldBlock, err))
955
}
956
_ => {
957
overlapped_wrapper.in_use = false;
958
res
959
}
960
}
961
}
962
963
fn get_overlapped_result_internal(
964
&mut self,
965
overlapped_wrapper: &mut OverlappedWrapper,
966
wait: bool,
967
) -> io::Result<u32> {
968
if !overlapped_wrapper.in_use {
969
return Err(std::io::Error::new(
970
std::io::ErrorKind::InvalidInput,
971
"Overlapped struct is not in use",
972
));
973
}
974
975
let mut size_transferred = 0;
976
// SAFETY:
977
// Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
978
// Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
979
if (unsafe {
980
GetOverlappedResult(
981
self.handle.as_raw_descriptor(),
982
&mut *overlapped_wrapper.overlapped.0,
983
&mut size_transferred,
984
if wait { TRUE } else { FALSE },
985
)
986
}) != 0
987
{
988
Ok(size_transferred)
989
} else {
990
let e = io::Error::last_os_error();
991
match e.raw_os_error() {
992
// More data => partial read of a message on a message pipe. This isn't really an
993
// error (see PipeConnection::read_internal) since we filled the provided buffer.
994
Some(error_code) if error_code as u32 == ERROR_MORE_DATA => Ok(size_transferred),
995
_ => Err(e),
996
}
997
}
998
}
999
1000
/// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
1001
/// cancel all I/O requests for the file handle passed in.
1002
pub fn cancel_io(&mut self) -> Result<()> {
1003
fail_if_zero!(
1004
// SAFETY: descriptor is valid and the return value is checked.
1005
unsafe {
1006
CancelIoEx(
1007
self.handle.as_raw_descriptor(),
1008
/* lpOverlapped= */ std::ptr::null_mut(),
1009
)
1010
}
1011
);
1012
1013
Ok(())
1014
}
1015
1016
/// Get the framing mode of the pipe.
1017
pub fn get_framing_mode(&self) -> FramingMode {
1018
self.framing_mode
1019
}
1020
1021
/// Returns metadata about the connected NamedPipe.
1022
pub fn get_info(&self) -> Result<NamedPipeInfo> {
1023
let mut flags: u32 = 0;
1024
let mut incoming_buffer_size: u32 = 0;
1025
let mut outgoing_buffer_size: u32 = 0;
1026
let mut max_instances: u32 = 0;
1027
// SAFETY: all pointers are valid
1028
fail_if_zero!(unsafe {
1029
GetNamedPipeInfo(
1030
self.as_raw_descriptor(),
1031
&mut flags,
1032
&mut outgoing_buffer_size,
1033
&mut incoming_buffer_size,
1034
&mut max_instances,
1035
)
1036
});
1037
1038
Ok(NamedPipeInfo {
1039
outgoing_buffer_size,
1040
incoming_buffer_size,
1041
max_instances,
1042
flags,
1043
})
1044
}
1045
1046
/// For a server pipe, flush the pipe contents. This will
1047
/// block until the pipe is cleared by the client. Only
1048
/// call this if you are sure the client is reading the
1049
/// data!
1050
pub fn flush_data_blocking(&self) -> Result<()> {
1051
// SAFETY:
1052
// Safe because the only buffers interacted with are
1053
// outside of Rust memory
1054
fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });
1055
Ok(())
1056
}
1057
1058
/// For a server pipe, disconnect all clients, discarding any buffered data.
1059
pub fn disconnect_clients(&self) -> Result<()> {
1060
// SAFETY:
1061
// Safe because we own the handle passed in and know it will remain valid for the duration
1062
// of the call. Discarded buffers are not managed by rust.
1063
fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });
1064
Ok(())
1065
}
1066
}
1067
1068
impl AsRawDescriptor for PipeConnection {
1069
fn as_raw_descriptor(&self) -> RawDescriptor {
1070
self.handle.as_raw_descriptor()
1071
}
1072
}
1073
1074
impl IntoRawDescriptor for PipeConnection {
1075
fn into_raw_descriptor(self) -> RawDescriptor {
1076
self.handle.into_raw_descriptor()
1077
}
1078
}
1079
1080
// SAFETY: Send safety is ensured by inner fields.
1081
unsafe impl Send for PipeConnection {}
1082
// SAFETY: Sync safety is ensured by inner fields.
1083
unsafe impl Sync for PipeConnection {}
1084
1085
impl io::Read for PipeConnection {
1086
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1087
// SAFETY:
1088
// This is safe because PipeConnection::read is always safe for u8
1089
unsafe { PipeConnection::read(self, buf) }
1090
}
1091
}
1092
1093
impl io::Write for PipeConnection {
1094
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1095
PipeConnection::write(self, buf)
1096
}
1097
1098
fn flush(&mut self) -> io::Result<()> {
1099
Ok(())
1100
}
1101
}
1102
1103
/// A simple data struct representing
1104
/// metadata about a NamedPipe.
1105
#[derive(Debug, PartialEq, Eq)]
1106
pub struct NamedPipeInfo {
1107
pub outgoing_buffer_size: u32,
1108
pub incoming_buffer_size: u32,
1109
pub max_instances: u32,
1110
pub flags: u32,
1111
}
1112
1113
/// This is a wrapper around PipeConnection. This allows a read and a write operations
1114
/// to run in parallel but not multiple reads or writes in parallel.
1115
///
1116
/// Reason: The message from/to service are two-parts - a fixed size header that
1117
/// contains the size of the actual message. By allowing only one write at a time
1118
/// we ensure that the variable size message is written/read right after writing/reading
1119
/// fixed size header. For example it avoid sending or receiving in messages in order like
1120
/// H1, H2, M1, M2
1121
/// - where header H1 and its message M1 are sent by one event loop and H2 and its message M2 are
1122
/// sent by another event loop.
1123
///
1124
/// Do not expose direct access to reader or writer pipes.
1125
///
1126
/// The struct is clone-able so that different event loops can talk to the other end.
1127
#[derive(Clone)]
1128
pub struct MultiPartMessagePipe {
1129
// Lock protected pipe to receive messages.
1130
reader: Arc<Mutex<PipeConnection>>,
1131
// Lock protected pipe to send messages.
1132
writer: Arc<Mutex<PipeConnection>>,
1133
// Whether this end is created as server or client. The variable helps to
1134
// decide if something meanigful should be done when `wait_for_connection` is called.
1135
is_server: bool,
1136
// Always true if pipe is created as client.
1137
// Defaults to false on server. Updated to true on calling `wait_for_connection`
1138
// after a client connects.
1139
is_connected: Arc<AtomicBool>,
1140
}
1141
1142
impl MultiPartMessagePipe {
1143
fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self> {
1144
Ok(Self {
1145
reader: Arc::new(Mutex::new(pipe.try_clone()?)),
1146
writer: Arc::new(Mutex::new(pipe)),
1147
is_server,
1148
is_connected: Arc::new(AtomicBool::new(false)),
1149
})
1150
}
1151
1152
/// Create server side of MutiPartMessagePipe.
1153
/// # Safety
1154
/// `pipe` must be a server named pipe.
1155
#[deny(unsafe_op_in_unsafe_fn)]
1156
pub unsafe fn create_from_server_pipe(pipe: PipeConnection) -> Result<Self> {
1157
Self::create_from_pipe(pipe, true)
1158
}
1159
1160
/// Create client side of MutiPartMessagePipe.
1161
pub fn create_as_client(pipe_name: &str) -> Result<Self> {
1162
let pipe = create_client_pipe(
1163
&format!(r"\\.\pipe\{pipe_name}"),
1164
&FramingMode::Message,
1165
&BlockingMode::Wait,
1166
/* overlapped= */ true,
1167
)?;
1168
Self::create_from_pipe(pipe, false)
1169
}
1170
1171
/// Create server side of MutiPartMessagePipe.
1172
pub fn create_as_server(pipe_name: &str) -> Result<Self> {
1173
let pipe = create_server_pipe(
1174
&format!(r"\\.\pipe\{pipe_name}",),
1175
&FramingMode::Message,
1176
&BlockingMode::Wait,
1177
0,
1178
1024 * 1024,
1179
true,
1180
)?;
1181
// SAFETY: `pipe` is a server named pipe.
1182
unsafe { Self::create_from_server_pipe(pipe) }
1183
}
1184
1185
/// If the struct is created as a server then waits for client connection to arrive.
1186
/// It only waits on reader as reader and writer are clones.
1187
pub fn wait_for_connection(&self) -> Result<()> {
1188
if self.is_server && !self.is_connected.load(Ordering::Relaxed) {
1189
self.reader.lock().wait_for_client_connection()?;
1190
self.is_connected.store(true, Ordering::Relaxed);
1191
}
1192
Ok(())
1193
}
1194
1195
fn write_overlapped_blocking_message_internal<T: PipeSendable>(
1196
pipe: &mut PipeConnection,
1197
buf: &[T],
1198
overlapped_wrapper: &mut OverlappedWrapper,
1199
) -> Result<()> {
1200
// Safety:
1201
// `buf` and `overlapped_wrapper` will be in use for the duration of
1202
// the overlapped operation. These must not be reused and must live until
1203
// after `get_overlapped_result()` has been called which is done right
1204
// after this call.
1205
unsafe {
1206
pipe.write_overlapped(buf, overlapped_wrapper)?;
1207
}
1208
1209
let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?;
1210
1211
if size_written_in_bytes as usize != buf.len() {
1212
return Err(std::io::Error::new(
1213
std::io::ErrorKind::UnexpectedEof,
1214
format!(
1215
"Short write expected:{} found:{}",
1216
size_written_in_bytes,
1217
buf.len(),
1218
),
1219
));
1220
}
1221
Ok(())
1222
}
1223
/// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
1224
pub fn write_overlapped_blocking_message<T: PipeSendable>(
1225
&self,
1226
header: &[T],
1227
message: &[T],
1228
overlapped_wrapper: &mut OverlappedWrapper,
1229
) -> Result<()> {
1230
let mut writer = self.writer.lock();
1231
Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?;
1232
Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper)
1233
}
1234
1235
/// Reads a variable size message and returns the message on success.
1236
/// The size of the message is expected to proceed the message in
1237
/// the form of `header_size` message.
1238
///
1239
/// `parse_message_size` lets caller parse the header to extract
1240
/// message size.
1241
///
1242
/// Event on `exit_event` is used to interrupt the blocked read.
1243
pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
1244
&self,
1245
header_size: usize,
1246
parse_message_size: F,
1247
overlapped_wrapper: &mut OverlappedWrapper,
1248
exit_event: &Event,
1249
) -> Result<Vec<u8>> {
1250
let mut pipe = self.reader.lock();
1251
let mut header = vec![0; header_size];
1252
header.resize_with(header_size, Default::default);
1253
pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
1254
let message_size = parse_message_size(&header);
1255
if message_size == 0 {
1256
return Ok(vec![]);
1257
}
1258
let mut buf = vec![];
1259
buf.resize_with(message_size, Default::default);
1260
pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
1261
Ok(buf)
1262
}
1263
1264
/// Returns the inner named pipe if the current struct is the sole owner of the underlying
1265
/// named pipe.
1266
///
1267
/// Otherwise, [`None`] is returned and the struct is dropped.
1268
///
1269
/// Note that this has a similar race condition like [`Arc::try_unwrap`]: if multiple threads
1270
/// call this function simultaneously on the same clone of [`MultiPartMessagePipe`], it is
1271
/// possible that all of them will result in [`None`]. This is Due to Rust version
1272
/// restriction(1.68.2) when this function is introduced). This race condition can be resolved
1273
/// once we upgrade to 1.70.0 or higher by using [`Arc::into_inner`].
1274
///
1275
/// If the underlying named pipe is a server named pipe, this method allows the caller to
1276
/// terminate the connection by first flushing the named pipe then disconnecting the clients
1277
/// idiomatically per
1278
/// https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipe-operations#:~:text=When%20a%20client,of%20the%20pipe.
1279
pub fn into_inner_pipe(self) -> Option<PipeConnection> {
1280
let piper = Arc::clone(&self.reader);
1281
drop(self);
1282
Arc::try_unwrap(piper).ok().map(Mutex::into_inner)
1283
}
1284
}
1285
1286
impl TryFrom<PipeConnection> for MultiPartMessagePipe {
1287
type Error = std::io::Error;
1288
fn try_from(pipe: PipeConnection) -> Result<Self> {
1289
Self::create_from_pipe(pipe, false)
1290
}
1291
}
1292
1293
#[cfg(test)]
1294
mod tests {
1295
use std::mem::size_of;
1296
use std::thread::JoinHandle;
1297
use std::time::Duration;
1298
1299
use super::*;
1300
1301
#[test]
1302
fn duplex_pipe_stream() {
1303
let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1304
1305
// Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1306
// SAFETY: trivially safe with pipe created and return value checked.
1307
unsafe {
1308
for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1309
println!("{dir}");
1310
1311
sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
1312
1313
// Smaller than what we sent so we get multiple chunks
1314
let mut recv_buffer: [u8; 4] = [0; 4];
1315
1316
let mut size = receiver.read(&mut recv_buffer).unwrap();
1317
assert_eq!(size, 4);
1318
assert_eq!(recv_buffer, [75, 77, 54, 82]);
1319
1320
size = receiver.read(&mut recv_buffer).unwrap();
1321
assert_eq!(size, 2);
1322
assert_eq!(recv_buffer[0..2], [76, 65]);
1323
}
1324
}
1325
}
1326
1327
#[test]
1328
fn available_byte_count_byte_mode() {
1329
let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1330
p1.write(&[1, 23, 45]).unwrap();
1331
assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1332
1333
// PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1334
// yield the same value.
1335
assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1336
}
1337
1338
#[test]
1339
fn available_byte_count_message_mode() {
1340
let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1341
p1.write(&[1, 23, 45]).unwrap();
1342
assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1343
1344
// PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1345
// yield the same value.
1346
assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1347
}
1348
1349
#[test]
1350
fn available_byte_count_message_mode_multiple_messages() {
1351
let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1352
p1.write(&[1, 2, 3]).unwrap();
1353
p1.write(&[4, 5]).unwrap();
1354
assert_eq!(p2.get_available_byte_count().unwrap(), 5);
1355
}
1356
1357
#[test]
1358
fn duplex_pipe_message() {
1359
let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1360
1361
// Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1362
// SAFETY: trivially safe with pipe created and return value checked.
1363
unsafe {
1364
for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1365
println!("{dir}");
1366
1367
// Send 2 messages so that we can check that message framing works
1368
sender.write(&[1, 23, 45]).unwrap();
1369
sender.write(&[67, 89, 10]).unwrap();
1370
1371
let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
1372
1373
let mut size = receiver.read(&mut recv_buffer).unwrap();
1374
assert_eq!(size, 3);
1375
assert_eq!(recv_buffer[0..3], [1, 23, 45]);
1376
1377
size = receiver.read(&mut recv_buffer).unwrap();
1378
assert_eq!(size, 3);
1379
assert_eq!(recv_buffer[0..3], [67, 89, 10]);
1380
}
1381
}
1382
}
1383
1384
#[cfg(test)]
1385
fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
1386
let mut recv_buffer: [u8; 1] = [0; 1];
1387
1388
// Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1389
// SAFETY: trivially safe with PipeConnection created and return value checked.
1390
unsafe {
1391
for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1392
println!("{dir}");
1393
sender.write(&[1]).unwrap();
1394
assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
1395
assert_eq!(
1396
receiver.read(&mut recv_buffer).unwrap_err().kind(),
1397
std::io::ErrorKind::WouldBlock
1398
);
1399
}
1400
}
1401
}
1402
1403
#[test]
1404
fn duplex_nowait() {
1405
let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
1406
duplex_nowait_helper(&p1, &p2);
1407
}
1408
1409
#[test]
1410
fn duplex_nowait_set_after_creation() {
1411
// Tests non blocking setting after pipe creation
1412
let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1413
p1.set_blocking(&BlockingMode::NoWait)
1414
.expect("Failed to set blocking mode on pipe p1");
1415
p2.set_blocking(&BlockingMode::NoWait)
1416
.expect("Failed to set blocking mode on pipe p2");
1417
duplex_nowait_helper(&p1, &p2);
1418
}
1419
1420
#[test]
1421
fn duplex_overlapped() {
1422
let pipe_name = generate_pipe_name();
1423
1424
let mut p1 = create_server_pipe(
1425
&pipe_name,
1426
&FramingMode::Message,
1427
&BlockingMode::Wait,
1428
/* timeout= */ 0,
1429
/* buffer_size= */ 1000,
1430
/* overlapped= */ true,
1431
)
1432
.unwrap();
1433
1434
let mut p2 = create_client_pipe(
1435
&pipe_name,
1436
&FramingMode::Message,
1437
&BlockingMode::Wait,
1438
/* overlapped= */ true,
1439
)
1440
.unwrap();
1441
1442
// SAFETY:
1443
// Safe because `read_overlapped` can be called since overlapped struct is created.
1444
unsafe {
1445
let mut p1_overlapped_wrapper =
1446
OverlappedWrapper::new(/* include_event= */ true).unwrap();
1447
p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
1448
.unwrap();
1449
let size = p1
1450
.get_overlapped_result(&mut p1_overlapped_wrapper)
1451
.unwrap();
1452
assert_eq!(size, 6);
1453
1454
let mut recv_buffer: [u8; 6] = [0; 6];
1455
1456
let mut p2_overlapped_wrapper =
1457
OverlappedWrapper::new(/* include_event= */ true).unwrap();
1458
p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
1459
.unwrap();
1460
let size = p2
1461
.get_overlapped_result(&mut p2_overlapped_wrapper)
1462
.unwrap();
1463
assert_eq!(size, 6);
1464
assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
1465
}
1466
}
1467
1468
#[test]
1469
fn duplex_overlapped_test_in_use() {
1470
let pipe_name = generate_pipe_name();
1471
1472
let mut p1 = create_server_pipe(
1473
&pipe_name,
1474
&FramingMode::Message,
1475
&BlockingMode::Wait,
1476
/* timeout= */ 0,
1477
/* buffer_size= */ 1000,
1478
/* overlapped= */ true,
1479
)
1480
.unwrap();
1481
1482
let mut p2 = create_client_pipe(
1483
&pipe_name,
1484
&FramingMode::Message,
1485
&BlockingMode::Wait,
1486
/* overlapped= */ true,
1487
)
1488
.unwrap();
1489
let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1490
1491
let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1492
assert!(res.is_err());
1493
1494
let data = vec![75, 77, 54, 82, 76, 65];
1495
// SAFETY: safe because: data & overlapped wrapper live until the
1496
// operation is verified completed below.
1497
let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };
1498
assert!(res.is_ok());
1499
1500
let res =
1501
// SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1502
// will error out.
1503
unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };
1504
assert!(res.is_err());
1505
1506
let mut recv_buffer: [u8; 6] = [0; 6];
1507
// SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1508
// will error out.
1509
let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1510
assert!(res.is_err());
1511
1512
let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1513
assert!(res.is_ok());
1514
1515
let mut recv_buffer: [u8; 6] = [0; 6];
1516
// SAFETY: safe because recv_buffer & overlapped_wrapper live until the
1517
// operation is verified completed below.
1518
let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1519
assert!(res.is_ok());
1520
let res = p2.get_overlapped_result(&mut overlapped_wrapper);
1521
assert!(res.is_ok());
1522
}
1523
1524
fn generate_pipe_name() -> String {
1525
format!(r"\\.\pipe\test-ipc-pipe-name.rand{}", rand::random::<u64>())
1526
}
1527
1528
fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> {
1529
let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"];
1530
std::thread::spawn(move || {
1531
let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1532
let exit_event = Event::new().unwrap();
1533
for _i in 0..msg_count {
1534
let message = messages[rand::random_range(..messages.len())];
1535
pipe.write_overlapped_blocking_message(
1536
&message.len().to_be_bytes(),
1537
message.as_bytes(),
1538
&mut overlapped_wrapper,
1539
)
1540
.unwrap();
1541
}
1542
for _i in 0..msg_count {
1543
let message = pipe
1544
.read_overlapped_blocking_message(
1545
size_of::<usize>(),
1546
|bytes: &[u8]| {
1547
assert_eq!(bytes.len(), size_of::<usize>());
1548
usize::from_be_bytes(
1549
bytes.try_into().expect("failed to get array from slice"),
1550
)
1551
},
1552
&mut overlapped_wrapper,
1553
&exit_event,
1554
)
1555
.unwrap();
1556
assert_eq!(
1557
*messages.get(message.len() - 1).unwrap(),
1558
std::str::from_utf8(&message).unwrap(),
1559
);
1560
}
1561
})
1562
}
1563
1564
#[test]
1565
fn multipart_message_smoke_test() {
1566
let pipe_name = generate_pipe_name();
1567
let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap();
1568
let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap();
1569
let handles = [
1570
send_receive_msgs(server.clone(), 100),
1571
send_receive_msgs(client.clone(), 100),
1572
send_receive_msgs(server, 100),
1573
send_receive_msgs(client, 100),
1574
];
1575
for h in handles {
1576
h.join().unwrap();
1577
}
1578
}
1579
1580
#[test]
1581
fn multipart_message_into_inner_pipe() {
1582
let pipe_name = generate_pipe_name();
1583
let mut pipe = create_server_pipe(
1584
&format!(r"\\.\pipe\{pipe_name}"),
1585
&FramingMode::Message,
1586
&BlockingMode::Wait,
1587
0,
1588
1024 * 1024,
1589
true,
1590
)
1591
.expect("should create the server pipe with success");
1592
let server1 = {
1593
let pipe = pipe
1594
.try_clone()
1595
.expect("should duplicate the named pipe with success");
1596
// SAFETY: `pipe` is a server named pipe.
1597
unsafe { MultiPartMessagePipe::create_from_server_pipe(pipe) }
1598
.expect("should create the multipart message pipe with success")
1599
};
1600
let server2 = server1.clone();
1601
assert!(
1602
server2.into_inner_pipe().is_none(),
1603
"not the last reference, should be None"
1604
);
1605
let inner_pipe = server1
1606
.into_inner_pipe()
1607
.expect("the last reference, should return the underlying pipe");
1608
// CompareObjectHandles is a Windows 10 API and is not available in mingw, so we can't use
1609
// that API to compare if 2 handles are the same.
1610
pipe.set_blocking(&BlockingMode::NoWait)
1611
.expect("should set the blocking mode on the original pipe with success");
1612
assert_eq!(
1613
pipe.get_info()
1614
.expect("should get the pipe information on the original pipe successfully"),
1615
inner_pipe
1616
.get_info()
1617
.expect("should get the pipe information on the inner pipe successfully")
1618
);
1619
pipe.set_blocking(&BlockingMode::Wait)
1620
.expect("should set the blocking mode on the original pipe with success");
1621
assert_eq!(
1622
pipe.get_info()
1623
.expect("should get the pipe information on the original pipe successfully"),
1624
inner_pipe
1625
.get_info()
1626
.expect("should get the pipe information on the inner pipe successfully")
1627
);
1628
}
1629
1630
#[test]
1631
fn test_wait_for_connection_blocking() {
1632
let pipe_name = generate_pipe_name();
1633
1634
let mut server_pipe = create_server_pipe(
1635
&pipe_name,
1636
&FramingMode::Message,
1637
&BlockingMode::Wait,
1638
/* timeout= */ 0,
1639
/* buffer_size= */ 1000,
1640
/* overlapped= */ true,
1641
)
1642
.unwrap();
1643
1644
let server = crate::thread::spawn_with_timeout(move || {
1645
let exit_event = Event::new().unwrap();
1646
server_pipe
1647
.wait_for_client_connection_overlapped_blocking(&exit_event)
1648
.unwrap();
1649
});
1650
1651
let _client = create_client_pipe(
1652
&pipe_name,
1653
&FramingMode::Message,
1654
&BlockingMode::Wait,
1655
/* overlapped= */ true,
1656
)
1657
.unwrap();
1658
server.try_join(Duration::from_secs(10)).unwrap();
1659
}
1660
1661
#[test]
1662
fn test_wait_for_connection_blocking_exit_triggered() {
1663
let pipe_name = generate_pipe_name();
1664
1665
let mut server_pipe = create_server_pipe(
1666
&pipe_name,
1667
&FramingMode::Message,
1668
&BlockingMode::Wait,
1669
/* timeout= */ 0,
1670
/* buffer_size= */ 1000,
1671
/* overlapped= */ true,
1672
)
1673
.unwrap();
1674
1675
let exit_event = Event::new().unwrap();
1676
let exit_event_for_server = exit_event.try_clone().unwrap();
1677
let server = crate::thread::spawn_with_timeout(move || {
1678
assert!(server_pipe
1679
.wait_for_client_connection_overlapped_blocking(&exit_event_for_server)
1680
.is_err());
1681
});
1682
exit_event.signal().unwrap();
1683
server.try_join(Duration::from_secs(10)).unwrap();
1684
}
1685
1686
#[test]
1687
fn std_io_read_eof() {
1688
let (mut w, mut r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1689
std::io::Write::write(&mut w, &[1, 2, 3]).unwrap();
1690
std::mem::drop(w);
1691
1692
let mut buffer: [u8; 4] = [0; 4];
1693
assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 3);
1694
assert_eq!(buffer, [1, 2, 3, 0]);
1695
assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1696
assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1697
}
1698
1699
#[test]
1700
fn std_io_write_eof() {
1701
let (mut w, r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1702
std::mem::drop(r);
1703
let result = std::io::Write::write(&mut w, &[1, 2, 3]);
1704
// Not required to return BrokenPipe here, something like Ok(0) is also acceptable.
1705
assert!(
1706
result.is_err()
1707
&& result.as_ref().unwrap_err().kind() == std::io::ErrorKind::BrokenPipe,
1708
"expected Err(BrokenPipe), got {result:?}"
1709
);
1710
}
1711
}
1712
1713