// Copyright 2022 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::ffi::CString;5use std::fs::OpenOptions;6use std::io;7use std::io::Result;8use std::mem;9use std::os::windows::fs::OpenOptionsExt;10use std::process;11use std::ptr;12use std::sync::atomic::AtomicBool;13use std::sync::atomic::AtomicUsize;14use std::sync::atomic::Ordering;15use std::sync::Arc;1617use serde::Deserialize;18use serde::Serialize;19use sync::Mutex;20use win_util::fail_if_zero;21use win_util::SecurityAttributes;22use win_util::SelfRelativeSecurityDescriptor;23use winapi::shared::minwindef::DWORD;24use winapi::shared::minwindef::FALSE;25use winapi::shared::minwindef::TRUE;26use winapi::shared::winerror::ERROR_BROKEN_PIPE;27use winapi::shared::winerror::ERROR_IO_INCOMPLETE;28use winapi::shared::winerror::ERROR_IO_PENDING;29use winapi::shared::winerror::ERROR_MORE_DATA;30use winapi::shared::winerror::ERROR_NO_DATA;31use winapi::shared::winerror::ERROR_PIPE_CONNECTED;32use winapi::um::errhandlingapi::GetLastError;33use winapi::um::fileapi::FlushFileBuffers;34use winapi::um::handleapi::INVALID_HANDLE_VALUE;35use winapi::um::ioapiset::CancelIoEx;36use winapi::um::ioapiset::GetOverlappedResult;37use winapi::um::minwinbase::OVERLAPPED;38use winapi::um::namedpipeapi::ConnectNamedPipe;39use winapi::um::namedpipeapi::DisconnectNamedPipe;40use winapi::um::namedpipeapi::GetNamedPipeInfo;41use winapi::um::namedpipeapi::PeekNamedPipe;42use winapi::um::namedpipeapi::SetNamedPipeHandleState;43use winapi::um::winbase::CreateNamedPipeA;44use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;45use winapi::um::winbase::FILE_FLAG_OVERLAPPED;46use winapi::um::winbase::PIPE_ACCESS_DUPLEX;47use winapi::um::winbase::PIPE_NOWAIT;48use winapi::um::winbase::PIPE_READMODE_BYTE;49use winapi::um::winbase::PIPE_READMODE_MESSAGE;50use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;51use winapi::um::winbase::PIPE_TYPE_BYTE;52use winapi::um::winbase::PIPE_TYPE_MESSAGE;53use winapi::um::winbase::PIPE_WAIT;54use winapi::um::winbase::SECURITY_IDENTIFICATION;5556use super::RawDescriptor;57use crate::descriptor::AsRawDescriptor;58use crate::descriptor::FromRawDescriptor;59use crate::descriptor::IntoRawDescriptor;60use crate::descriptor::SafeDescriptor;61use crate::Event;62use crate::EventToken;63use crate::WaitContext;6465/// The default buffer size for all named pipes in the system. If this size is too small, writers66/// on named pipes that expect not to block *can* block until the reading side empties the buffer.67///68/// The general rule is this should be *at least* as big as the largest message, otherwise69/// unexpected blocking behavior can result; for example, if too small, this can interact badly with70/// crate::windows::StreamChannel, which expects to be able to make a complete write before71/// releasing a lock that the opposite side needs to complete a read. This means that if the buffer72/// is too small:73/// * The writer can't complete its write and release the lock because the buffer is too small.74/// * The reader can't start reading because the lock is held by the writer, so it can't relieve75/// buffer pressure. Note that for message pipes, the reader couldn't do anything to help76/// anyway, because a message mode pipe should NOT have a partial read (which is what we would77/// need to relieve pressure).78/// * Conditions for deadlock are met, and both the reader & writer enter circular waiting.79pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;8081static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);8283#[remain::sorted]84#[derive(Debug, thiserror::Error)]85pub enum PipeError {86#[error("read zero bytes, but this is not an EOF")]87ZeroByteReadNoEof,88}8990/// Represents one end of a named pipe91///92/// NOTE: implementations of Read & Write are trait complaint for EOF/broken pipe handling93/// (returning a successful zero byte read), but overlapped read/write versions are NOT (they will94/// return broken pipe directly due to API limitations; see PipeConnection::read for95/// details).96#[derive(Serialize, Deserialize, Debug)]97pub struct PipeConnection {98handle: SafeDescriptor,99framing_mode: FramingMode,100blocking_mode: BlockingMode,101}102103/// `OVERLAPPED` is allocated on the heap because it must not move while performing I/O operations.104///105/// Defined as a separate type so that we can mark it as `Send` and `Sync`.106pub struct BoxedOverlapped(pub Box<OVERLAPPED>);107108// SAFETY: `OVERLAPPED` is not automatically `Send` because it contains a `HANDLE`, which is a raw109// pointer, but `HANDLE`s are safe to move between threads and thus so is `OVERLAPPED`.110unsafe impl Send for BoxedOverlapped {}111112// SAFETY: See the argument for `Send` above. `HANDLE`s are also safe to share between threads.113unsafe impl Sync for BoxedOverlapped {}114115/// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a116/// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.117pub struct OverlappedWrapper {118overlapped: BoxedOverlapped,119// This field prevents the event handle from being dropped too early and allows callers to120// be notified when a read or write overlapped operation has completed.121h_event: Option<Event>,122in_use: bool,123}124125impl OverlappedWrapper {126pub fn get_h_event_ref(&self) -> Option<&Event> {127self.h_event.as_ref()128}129130/// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order131/// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object132/// returned must not be dropped.133///134/// There is an option to create the event object and set it to the `hEvent` field. If hEvent135/// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file136/// handle will be signaled when the operation is complete. In other words, you can use137/// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by138/// Microsoft though.139pub fn new(include_event: bool) -> Result<OverlappedWrapper> {140let mut overlapped = OVERLAPPED::default();141let h_event = if include_event {142Some(Event::new()?)143} else {144None145};146147overlapped.hEvent = if let Some(event) = h_event.as_ref() {148event.as_raw_descriptor()149} else {1500 as RawDescriptor151};152153Ok(OverlappedWrapper {154overlapped: BoxedOverlapped(Box::new(overlapped)),155h_event,156in_use: false,157})158}159}160161pub trait WriteOverlapped {162/// Perform an overlapped write operation with the specified buffer and overlapped wrapper.163/// If successful, the write operation will complete asynchronously, and164/// `write_result()` should be called to get the result.165///166/// # Safety167/// `buf` and `overlapped_wrapper` will be in use for the duration of168/// the overlapped operation. These must not be reused and must live until169/// after `write_result()` has been called.170unsafe fn write_overlapped(171&mut self,172buf: &mut [u8],173overlapped_wrapper: &mut OverlappedWrapper,174) -> io::Result<()>;175176/// Gets the result of the overlapped write operation. Must only be called177/// after issuing an overlapped write operation using `write_overlapped`. The178/// same `overlapped_wrapper` must be provided.179fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;180181/// Tries to get the result of the overlapped write operation. Must only be182/// called once, and only after issuing an overlapped write operation using183/// `write_overlapped`. The same `overlapped_wrapper` must be provided.184///185/// An error indicates that the operation hasn't completed yet and186/// `write_result` or `try_write_result` should be called again.187fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)188-> io::Result<usize>;189}190191pub trait ReadOverlapped {192/// Perform an overlapped read operation with the specified buffer and overlapped wrapper.193/// If successful, the read operation will complete asynchronously, and194/// `read_result()` should be called to get the result.195///196/// # Safety197/// `buf` and `overlapped_wrapper` will be in use for the duration of198/// the overlapped operation. These must not be reused and must live until199/// after `read_result()` has been called.200unsafe fn read_overlapped(201&mut self,202buf: &mut [u8],203overlapped_wrapper: &mut OverlappedWrapper,204) -> io::Result<()>;205206/// Gets the result of the overlapped read operation. Must only be called207/// once, and only after issuing an overlapped read operation using208/// `read_overlapped`. The same `overlapped_wrapper` must be provided.209fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;210211/// Tries to get the result of the overlapped read operation. Must only be called212/// after issuing an overlapped read operation using `read_overlapped`. The213/// same `overlapped_wrapper` must be provided.214///215/// An error indicates that the operation hasn't completed yet and216/// `read_result` or `try_read_result` should be called again.217fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;218}219220#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]221pub enum FramingMode {222Byte,223Message,224}225226impl FramingMode {227fn to_readmode(self) -> DWORD {228match self {229FramingMode::Message => PIPE_READMODE_MESSAGE,230FramingMode::Byte => PIPE_READMODE_BYTE,231}232}233234fn to_pipetype(self) -> DWORD {235match self {236FramingMode::Message => PIPE_TYPE_MESSAGE,237FramingMode::Byte => PIPE_TYPE_BYTE,238}239}240}241242#[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]243pub enum BlockingMode {244/// Calls to read() block until data is received245Wait,246/// Calls to read() return immediately even if there is nothing read with error code 232247/// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)248///249/// NOTE: This mode is discouraged by the Windows API documentation.250NoWait,251}252253impl From<&BlockingMode> for DWORD {254fn from(blocking_mode: &BlockingMode) -> DWORD {255match blocking_mode {256BlockingMode::Wait => PIPE_WAIT,257BlockingMode::NoWait => PIPE_NOWAIT,258}259}260}261262/// Sets the handle state for a named pipe in a rust friendly way.263/// SAFETY:264/// This is safe if the pipe handle is open.265unsafe fn set_named_pipe_handle_state(266pipe_handle: RawDescriptor,267client_mode: &mut DWORD,268) -> Result<()> {269// Safe when the pipe handle is open. Safety also requires checking the return value, which we270// do below.271let success_flag = SetNamedPipeHandleState(272/* hNamedPipe= */ pipe_handle,273/* lpMode= */ client_mode,274/* lpMaxCollectionCount= */ ptr::null_mut(),275/* lpCollectDataTimeout= */ ptr::null_mut(),276);277if success_flag == 0 {278Err(io::Error::last_os_error())279} else {280Ok(())281}282}283284pub fn pair(285framing_mode: &FramingMode,286blocking_mode: &BlockingMode,287timeout: u64,288) -> Result<(PipeConnection, PipeConnection)> {289pair_with_buffer_size(290framing_mode,291blocking_mode,292timeout,293DEFAULT_BUFFER_SIZE,294false,295)296}297298/// Creates a pair of handles connected to either end of a duplex named pipe.299///300/// The pipe created will have a semi-random name and a default set of security options that301/// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject302/// remote clients, allow only a single server instance, and prevent impersonation by the server303/// end of the pipe.304///305/// # Arguments306///307/// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an308/// automatically framed sequence of messages (Message). In message mode it's an error to read309/// fewer bytes than were sent in a message from the other end of the pipe.310/// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or311/// return immediately if there is nothing available (NoWait).312/// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to313/// zero will create sockets with the system default timeout.314/// * `buffer_size` - The default buffer size for the named pipe. The system should expand the315/// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail316/// writes that don't fit in the buffer.317/// # Return value318///319/// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as320/// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.321pub fn pair_with_buffer_size(322framing_mode: &FramingMode,323blocking_mode: &BlockingMode,324timeout: u64,325buffer_size: usize,326overlapped: bool,327) -> Result<(PipeConnection, PipeConnection)> {328// Give the pipe a unique name to avoid accidental collisions329let pipe_name = format!(330r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",331process::id(),332NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),333rand::random::<u32>(),334);335336let server_end = create_server_pipe(337&pipe_name,338framing_mode,339blocking_mode,340timeout,341buffer_size,342overlapped,343)?;344345// Open the named pipe we just created as the client346let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;347348// Accept the client's connection349// Not sure if this is strictly needed but I'm doing it just in case.350// We expect at this point that the client will already be connected,351// so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.352// It's also OK if we get a return code of success.353server_end.wait_for_client_connection()?;354355Ok((server_end, client_end))356}357358/// Creates a PipeConnection for the server end of a named pipe with the given path and pipe359/// settings.360///361/// The pipe will be set to reject remote clients and allow only a single connection at a time.362///363/// # Arguments364///365/// * `pipe_name` - The path of the named pipe to create. Should be in the form366/// `\\.\pipe\<some-name>`.367/// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an368/// automatically framed sequence of messages (Message). In message mode it's an error to read369/// fewer bytes than were sent in a message from the other end of the pipe.370/// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or371/// return immediately if there is nothing available (NoWait).372/// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to373/// zero will create sockets with the system default timeout.374/// * `buffer_size` - The default buffer size for the named pipe. The system should expand the375/// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail376/// writes that don't fit in the buffer.377/// * `overlapped` - Sets whether overlapped mode is set on the pipe.378pub fn create_server_pipe(379pipe_name: &str,380framing_mode: &FramingMode,381blocking_mode: &BlockingMode,382timeout: u64,383buffer_size: usize,384overlapped: bool,385) -> Result<PipeConnection> {386let c_pipe_name = CString::new(pipe_name).unwrap();387388let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;389if overlapped {390open_mode_flags |= FILE_FLAG_OVERLAPPED391}392393// This sets flags so there will be an error if >1 instance (server end)394// of this pipe name is opened because we expect exactly one.395// SAFETY:396// Safe because security attributes are valid, pipe_name is valid C string,397// and we're checking the return code398let server_handle = unsafe {399CreateNamedPipeA(400c_pipe_name.as_ptr(),401/* dwOpenMode= */402open_mode_flags,403/* dwPipeMode= */404framing_mode.to_pipetype()405| framing_mode.to_readmode()406| DWORD::from(blocking_mode)407| PIPE_REJECT_REMOTE_CLIENTS,408/* nMaxInstances= */ 1,409/* nOutBufferSize= */ buffer_size as DWORD,410/* nInBufferSize= */ buffer_size as DWORD,411/* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms412/* lpSecurityAttributes= */413SecurityAttributes::new_with_security_descriptor(414SelfRelativeSecurityDescriptor::get_singleton(),415/* inherit= */ true,416)417.as_mut(),418)419};420421if server_handle == INVALID_HANDLE_VALUE {422Err(io::Error::last_os_error())423} else {424// SAFETY: Safe because server_handle is valid.425unsafe {426Ok(PipeConnection {427handle: SafeDescriptor::from_raw_descriptor(server_handle),428framing_mode: *framing_mode,429blocking_mode: *blocking_mode,430})431}432}433}434435/// Creates a PipeConnection for the client end of a named pipe with the given path and pipe436/// settings.437///438/// The pipe will be set to prevent impersonation of the client by the server process.439///440/// # Arguments441///442/// * `pipe_name` - The path of the named pipe to create. Should be in the form443/// `\\.\pipe\<some-name>`.444/// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an445/// automatically framed sequence of messages (Message). In message mode it's an error to read446/// fewer bytes than were sent in a message from the other end of the pipe.447/// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or448/// return immediately if there is nothing available (NoWait).449/// * `overlapped` - Sets whether the pipe is opened in overlapped mode.450pub fn create_client_pipe(451pipe_name: &str,452framing_mode: &FramingMode,453blocking_mode: &BlockingMode,454overlapped: bool,455) -> Result<PipeConnection> {456let client_handle = OpenOptions::new()457.read(true)458.write(true)459.create(true)460.security_qos_flags(SECURITY_IDENTIFICATION)461.custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })462.open(pipe_name)?463.into_raw_descriptor();464465let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);466467// SAFETY:468// Safe because client_handle's open() call did not return an error.469unsafe {470set_named_pipe_handle_state(client_handle, &mut client_mode)?;471}472473Ok(PipeConnection {474// SAFETY:475// Safe because client_handle is valid476handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },477framing_mode: *framing_mode,478blocking_mode: *blocking_mode,479})480}481482// This is used to mark types which can be appropriately sent through the483// generic helper functions write_to_pipe and read_from_pipe.484pub trait PipeSendable {485// Default values used to fill in new empty indexes when resizing a buffer to486// a larger size.487fn default() -> Self;488}489impl PipeSendable for u8 {490fn default() -> Self {4910492}493}494impl PipeSendable for RawDescriptor {495fn default() -> Self {496ptr::null_mut()497}498}499500impl PipeConnection {501pub fn try_clone(&self) -> Result<PipeConnection> {502let copy_handle = self.handle.try_clone()?;503Ok(PipeConnection {504handle: copy_handle,505framing_mode: self.framing_mode,506blocking_mode: self.blocking_mode,507})508}509510/// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &511/// blocking modes.512///513/// # Safety514/// 1. rd is valid and ownership is transferred to this function when it is called.515///516/// To avoid undefined behavior, framing_mode & blocking_modes must match those of the517/// underlying pipe.518pub unsafe fn from_raw_descriptor(519rd: RawDescriptor,520framing_mode: FramingMode,521blocking_mode: BlockingMode,522) -> PipeConnection {523PipeConnection {524handle: SafeDescriptor::from_raw_descriptor(rd),525framing_mode,526blocking_mode,527}528}529530/// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.531/// Returns the number of bytes (not values) read.532///533/// # Safety534///535/// This is safe only when the following conditions hold:536/// 1. The data on the other end of the pipe is a valid binary representation of data for537/// type T, and538/// 2. The number of bytes read is a multiple of the size of T; this must be checked by539/// the caller.540/// If buf's type is file descriptors, this is only safe when those file descriptors are valid541/// for the process where this function was called.542pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {543match PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None) {544// Windows allows for zero byte writes on one end of a pipe to be read by the other as545// zero byte reads. These zero byte reads DO NOT signify EOF, so from the perspective546// of std::io::Read, they cannot be reported as Ok(0). We translate them to errors.547//548// Within CrosVM, this behavior is not used, but it has been implemented to avoid UB549// either in the future, or when talking to non CrosVM named pipes. If we need to550// actually use/understand this error from other parts of KiwiVM (e.g. PipeConnection551// consumers), we could use ErrorKind::Interrupted (which as of 24/11/26 is not used by552// Rust for other purposes).553Ok(len) if len == 0 && !buf.is_empty() => {554Err(io::Error::other(PipeError::ZeroByteReadNoEof))555}556557// Read at least 1 byte, or 0 bytes if a zero byte buffer was provided.558Ok(len) => Ok(len),559560// Treat a closed pipe like an EOF, because that is consistent with the Read trait.561//562// NOTE: this is explicitly NOT done for overlapped operations for a few reasons:563// 1. Overlapped operations do not follow the Read trait, so there is no strong reason564// *to* do it.565// 2. Ok(0) also means "overlapped operation started successfully." This is a real566// problem because the general pattern is to start an overlapped operation and then567// wait for it. So if we did that and the Ok(0) meant the pipe is closed, we would568// enter an infinite wait. (The kernel already told us when we started the operation569// that the pipe was closed. It won't tell us again.)570Err(e) if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) => Ok(0),571572Err(e) => Err(e),573}574}575576/// Similar to `PipeConnection::read` except it also allows:577/// 1. The same end of the named pipe to read and write at the same time in different578/// threads.579/// 2. Asynchronous read and write (read and write won't block).580///581/// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event582/// (can be created with `OverlappedWrapper::new`) will be passed into583/// `ReadFile`. That event will be triggered when the read operation is complete.584///585/// In order to get how many bytes were read, call `get_overlapped_result`. That function will586/// also help with waiting until the read operation is complete.587///588/// # Safety589///590/// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in591/// overlapped mode otherwise there may be unexpected behavior.592pub unsafe fn read_overlapped<T: PipeSendable>(593&mut self,594buf: &mut [T],595overlapped_wrapper: &mut OverlappedWrapper,596) -> Result<()> {597if overlapped_wrapper.in_use {598return Err(std::io::Error::new(599std::io::ErrorKind::InvalidInput,600"Overlapped struct already in use",601));602}603overlapped_wrapper.in_use = true;604605PipeConnection::read_internal(606&self.handle,607self.blocking_mode,608buf,609Some(&mut overlapped_wrapper.overlapped.0),610)?;611Ok(())612}613614/// Helper for `read_overlapped` and `read`615///616/// # Safety617/// Comments `read_overlapped` or `read`, depending on which is used.618unsafe fn read_internal<T: PipeSendable>(619handle: &SafeDescriptor,620blocking_mode: BlockingMode,621buf: &mut [T],622overlapped: Option<&mut OVERLAPPED>,623) -> Result<usize> {624let res = crate::windows::read_file(625handle,626buf.as_mut_ptr() as *mut u8,627mem::size_of_val(buf),628overlapped,629);630match res {631Ok(bytes_read) => Ok(bytes_read),632// For message mode pipes, if the buffer is too small for the entire message, the kernel633// will return ERROR_MORE_DATA. This isn't strictly an "error" because the operation634// succeeds. Making it an error also means it's hard to handle this cleanly from the635// perspective of an io::Read consumer. So we discard the non-error, and return the636// successful result of filling the entire buffer.637Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => Ok(buf.len()),638Err(e)639if blocking_mode == BlockingMode::NoWait640&& e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>641{642// A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,643// this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not644// correct. For further details see:645// https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-646// https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes647Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))648}649Err(e) => Err(e),650}651}652653/// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted654/// by an event on `exit_event`.655pub fn read_overlapped_blocking<T: PipeSendable>(656&mut self,657buf: &mut [T],658overlapped_wrapper: &mut OverlappedWrapper,659exit_event: &Event,660) -> Result<()> {661// SAFETY:662// Safe because we are providing a valid buffer slice and also providing a valid663// overlapped struct.664match unsafe { self.read_overlapped(buf, overlapped_wrapper) } {665Err(e) => Err(e),666Ok(()) => Ok(()),667}?;668669#[derive(EventToken)]670enum Token {671ReadOverlapped,672Exit,673}674675let wait_ctx = WaitContext::build_with(&[676(677overlapped_wrapper.get_h_event_ref().unwrap(),678Token::ReadOverlapped,679),680(exit_event, Token::Exit),681])?;682683let events = wait_ctx.wait()?;684for event in events {685match event.token {686Token::ReadOverlapped => {687let size_read_in_bytes =688self.get_overlapped_result(overlapped_wrapper)? as usize;689690// If this error shows, most likely the overlapped named pipe was set up691// incorrectly.692if size_read_in_bytes != buf.len() {693return Err(std::io::Error::new(694std::io::ErrorKind::UnexpectedEof,695"Short read",696));697}698}699Token::Exit => {700return Err(std::io::Error::new(701std::io::ErrorKind::Interrupted,702"IO canceled on exit request",703));704}705}706}707708Ok(())709}710711/// Gets the size in bytes of data in the pipe.712///713/// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have714/// not finished writing on the producer side.715pub fn get_available_byte_count(&self) -> io::Result<u32> {716let mut total_bytes_avail: DWORD = 0;717718// SAFETY:719// Safe because the underlying pipe handle is guaranteed to be open, and the output values720// live at valid memory locations.721fail_if_zero!(unsafe {722PeekNamedPipe(723self.as_raw_descriptor(),724ptr::null_mut(),7250,726ptr::null_mut(),727&mut total_bytes_avail,728ptr::null_mut(),729)730});731732Ok(total_bytes_avail)733}734735/// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which736/// callers should check to ensure that it was the number expected.737pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {738// SAFETY: overlapped is None so this is safe.739unsafe { PipeConnection::write_internal(&self.handle, buf, None) }740}741742/// Similar to `PipeConnection::write` except it also allows:743/// 1. The same end of the named pipe to read and write at the same time in different744/// threads.745/// 2. Asynchronous read and write (read and write won't block).746///747/// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event748/// (can be created with `OverlappedWrapper::new`) will be passed into749/// `WriteFile`. That event will be triggered when the write operation is complete.750///751/// In order to get how many bytes were written, call `get_overlapped_result`. That function752/// will also help with waiting until the write operation is complete. The pipe must be753/// opened in overlapped otherwise there may be unexpected behavior.754///755/// # Safety756/// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.757pub unsafe fn write_overlapped<T: PipeSendable>(758&mut self,759buf: &[T],760overlapped_wrapper: &mut OverlappedWrapper,761) -> Result<()> {762if overlapped_wrapper.in_use {763return Err(std::io::Error::new(764std::io::ErrorKind::InvalidInput,765"Overlapped struct already in use",766));767}768overlapped_wrapper.in_use = true;769770PipeConnection::write_internal(771&self.handle,772buf,773Some(&mut overlapped_wrapper.overlapped.0),774)?;775Ok(())776}777778/// Helper for `write_overlapped` and `write`.779///780/// # Safety781/// * Safe if overlapped is None.782/// * Safe if overlapped is Some and:783/// + buf lives until the overlapped operation is complete.784/// + overlapped lives until the overlapped operation is complete.785unsafe fn write_internal<T: PipeSendable>(786handle: &SafeDescriptor,787buf: &[T],788overlapped: Option<&mut OVERLAPPED>,789) -> Result<usize> {790// SAFETY:791// Safe because buf points to memory valid until the write completes and we pass a valid792// length for that memory.793unsafe {794crate::windows::write_file(795handle,796buf.as_ptr() as *const u8,797mem::size_of_val(buf),798overlapped,799)800}801}802803/// Sets the blocking mode on the pipe.804pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {805let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();806self.blocking_mode = *blocking_mode;807808// SAFETY:809// Safe because the pipe has not been closed (it is managed by this object).810unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }811}812813/// For a server named pipe, waits for a client to connect (blocking).814pub fn wait_for_client_connection(&self) -> Result<()> {815let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;816self.wait_for_client_connection_internal(817&mut overlapped_wrapper,818/* should_block = */ true,819)820}821822/// Interruptable blocking wait for a client to connect.823pub fn wait_for_client_connection_overlapped_blocking(824&mut self,825exit_event: &Event,826) -> Result<()> {827let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;828self.wait_for_client_connection_internal(829&mut overlapped_wrapper,830/* should_block = */ false,831)?;832833#[derive(EventToken)]834enum Token {835Connected,836Exit,837}838839let wait_ctx = WaitContext::build_with(&[840(841overlapped_wrapper.get_h_event_ref().unwrap(),842Token::Connected,843),844(exit_event, Token::Exit),845])?;846847let events = wait_ctx.wait()?;848if let Some(event) = events.into_iter().next() {849return match event.token {850Token::Connected => Ok(()),851Token::Exit => {852// We must cancel IO here because it is unsafe to free the overlapped wrapper853// while the IO operation is active.854self.cancel_io()?;855856Err(std::io::Error::new(857std::io::ErrorKind::Interrupted,858"IO canceled on exit request",859))860}861};862}863unreachable!("wait cannot return Ok with zero events");864}865866/// For a server named pipe, waits for a client to connect using the given overlapped wrapper867/// to signal connection.868pub fn wait_for_client_connection_overlapped(869&self,870overlapped_wrapper: &mut OverlappedWrapper,871) -> Result<()> {872self.wait_for_client_connection_internal(873overlapped_wrapper,874/* should_block = */ false,875)876}877878fn wait_for_client_connection_internal(879&self,880overlapped_wrapper: &mut OverlappedWrapper,881should_block: bool,882) -> Result<()> {883// SAFETY:884// Safe because the handle is valid and we're checking the return885// code according to the documentation886//887// TODO(b/279669296) this safety statement is incomplete, and as such incorrect in one case:888// overlapped_wrapper must live until the overlapped operation is complete; however,889// if should_block is false, nothing guarantees that lifetime and so overlapped_wrapper890// could be freed while the operation is still running.891unsafe {892let success_flag = ConnectNamedPipe(893self.as_raw_descriptor(),894// Note: The overlapped structure is only used if the pipe was opened in895// OVERLAPPED mode, but is necessary in that case.896&mut *overlapped_wrapper.overlapped.0,897);898if success_flag == 0 {899return match GetLastError() {900ERROR_PIPE_CONNECTED => {901if !should_block {902// If async, make sure the event is signalled to indicate the client903// is ready.904overlapped_wrapper.get_h_event_ref().unwrap().signal()?;905}906907Ok(())908}909ERROR_IO_PENDING => {910if should_block {911overlapped_wrapper.get_h_event_ref().unwrap().wait()?;912}913Ok(())914}915err => Err(io::Error::from_raw_os_error(err as i32)),916};917}918}919Ok(())920}921922/// Used for overlapped read and write operations.923///924/// This will block until the ReadFile or WriteFile operation that also took in925/// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from926/// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get927/// the number of bytes that were read or written.928pub fn get_overlapped_result(929&mut self,930overlapped_wrapper: &mut OverlappedWrapper,931) -> io::Result<u32> {932let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);933overlapped_wrapper.in_use = false;934res935}936937/// Used for overlapped read and write operations.938///939/// This will return immediately, regardless of the completion status of the940/// ReadFile or WriteFile operation that took in `overlapped_wrapper`,941/// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`942/// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes943/// that were read or written, if completed. If the operation hasn't944/// completed, an error of kind `io::ErrorKind::WouldBlock` will be945/// returned.946pub fn try_get_overlapped_result(947&mut self,948overlapped_wrapper: &mut OverlappedWrapper,949) -> io::Result<u32> {950let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);951match res {952Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {953Err(io::Error::new(io::ErrorKind::WouldBlock, err))954}955_ => {956overlapped_wrapper.in_use = false;957res958}959}960}961962fn get_overlapped_result_internal(963&mut self,964overlapped_wrapper: &mut OverlappedWrapper,965wait: bool,966) -> io::Result<u32> {967if !overlapped_wrapper.in_use {968return Err(std::io::Error::new(969std::io::ErrorKind::InvalidInput,970"Overlapped struct is not in use",971));972}973974let mut size_transferred = 0;975// SAFETY:976// Safe as long as `overlapped_struct` isn't copied and also contains a valid event.977// Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.978if (unsafe {979GetOverlappedResult(980self.handle.as_raw_descriptor(),981&mut *overlapped_wrapper.overlapped.0,982&mut size_transferred,983if wait { TRUE } else { FALSE },984)985}) != 0986{987Ok(size_transferred)988} else {989let e = io::Error::last_os_error();990match e.raw_os_error() {991// More data => partial read of a message on a message pipe. This isn't really an992// error (see PipeConnection::read_internal) since we filled the provided buffer.993Some(error_code) if error_code as u32 == ERROR_MORE_DATA => Ok(size_transferred),994_ => Err(e),995}996}997}998999/// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will1000/// cancel all I/O requests for the file handle passed in.1001pub fn cancel_io(&mut self) -> Result<()> {1002fail_if_zero!(1003// SAFETY: descriptor is valid and the return value is checked.1004unsafe {1005CancelIoEx(1006self.handle.as_raw_descriptor(),1007/* lpOverlapped= */ std::ptr::null_mut(),1008)1009}1010);10111012Ok(())1013}10141015/// Get the framing mode of the pipe.1016pub fn get_framing_mode(&self) -> FramingMode {1017self.framing_mode1018}10191020/// Returns metadata about the connected NamedPipe.1021pub fn get_info(&self) -> Result<NamedPipeInfo> {1022let mut flags: u32 = 0;1023let mut incoming_buffer_size: u32 = 0;1024let mut outgoing_buffer_size: u32 = 0;1025let mut max_instances: u32 = 0;1026// SAFETY: all pointers are valid1027fail_if_zero!(unsafe {1028GetNamedPipeInfo(1029self.as_raw_descriptor(),1030&mut flags,1031&mut outgoing_buffer_size,1032&mut incoming_buffer_size,1033&mut max_instances,1034)1035});10361037Ok(NamedPipeInfo {1038outgoing_buffer_size,1039incoming_buffer_size,1040max_instances,1041flags,1042})1043}10441045/// For a server pipe, flush the pipe contents. This will1046/// block until the pipe is cleared by the client. Only1047/// call this if you are sure the client is reading the1048/// data!1049pub fn flush_data_blocking(&self) -> Result<()> {1050// SAFETY:1051// Safe because the only buffers interacted with are1052// outside of Rust memory1053fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });1054Ok(())1055}10561057/// For a server pipe, disconnect all clients, discarding any buffered data.1058pub fn disconnect_clients(&self) -> Result<()> {1059// SAFETY:1060// Safe because we own the handle passed in and know it will remain valid for the duration1061// of the call. Discarded buffers are not managed by rust.1062fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });1063Ok(())1064}1065}10661067impl AsRawDescriptor for PipeConnection {1068fn as_raw_descriptor(&self) -> RawDescriptor {1069self.handle.as_raw_descriptor()1070}1071}10721073impl IntoRawDescriptor for PipeConnection {1074fn into_raw_descriptor(self) -> RawDescriptor {1075self.handle.into_raw_descriptor()1076}1077}10781079// SAFETY: Send safety is ensured by inner fields.1080unsafe impl Send for PipeConnection {}1081// SAFETY: Sync safety is ensured by inner fields.1082unsafe impl Sync for PipeConnection {}10831084impl io::Read for PipeConnection {1085fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {1086// SAFETY:1087// This is safe because PipeConnection::read is always safe for u81088unsafe { PipeConnection::read(self, buf) }1089}1090}10911092impl io::Write for PipeConnection {1093fn write(&mut self, buf: &[u8]) -> io::Result<usize> {1094PipeConnection::write(self, buf)1095}10961097fn flush(&mut self) -> io::Result<()> {1098Ok(())1099}1100}11011102/// A simple data struct representing1103/// metadata about a NamedPipe.1104#[derive(Debug, PartialEq, Eq)]1105pub struct NamedPipeInfo {1106pub outgoing_buffer_size: u32,1107pub incoming_buffer_size: u32,1108pub max_instances: u32,1109pub flags: u32,1110}11111112/// This is a wrapper around PipeConnection. This allows a read and a write operations1113/// to run in parallel but not multiple reads or writes in parallel.1114///1115/// Reason: The message from/to service are two-parts - a fixed size header that1116/// contains the size of the actual message. By allowing only one write at a time1117/// we ensure that the variable size message is written/read right after writing/reading1118/// fixed size header. For example it avoid sending or receiving in messages in order like1119/// H1, H2, M1, M21120/// - where header H1 and its message M1 are sent by one event loop and H2 and its message M2 are1121/// sent by another event loop.1122///1123/// Do not expose direct access to reader or writer pipes.1124///1125/// The struct is clone-able so that different event loops can talk to the other end.1126#[derive(Clone)]1127pub struct MultiPartMessagePipe {1128// Lock protected pipe to receive messages.1129reader: Arc<Mutex<PipeConnection>>,1130// Lock protected pipe to send messages.1131writer: Arc<Mutex<PipeConnection>>,1132// Whether this end is created as server or client. The variable helps to1133// decide if something meanigful should be done when `wait_for_connection` is called.1134is_server: bool,1135// Always true if pipe is created as client.1136// Defaults to false on server. Updated to true on calling `wait_for_connection`1137// after a client connects.1138is_connected: Arc<AtomicBool>,1139}11401141impl MultiPartMessagePipe {1142fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self> {1143Ok(Self {1144reader: Arc::new(Mutex::new(pipe.try_clone()?)),1145writer: Arc::new(Mutex::new(pipe)),1146is_server,1147is_connected: Arc::new(AtomicBool::new(false)),1148})1149}11501151/// Create server side of MutiPartMessagePipe.1152/// # Safety1153/// `pipe` must be a server named pipe.1154#[deny(unsafe_op_in_unsafe_fn)]1155pub unsafe fn create_from_server_pipe(pipe: PipeConnection) -> Result<Self> {1156Self::create_from_pipe(pipe, true)1157}11581159/// Create client side of MutiPartMessagePipe.1160pub fn create_as_client(pipe_name: &str) -> Result<Self> {1161let pipe = create_client_pipe(1162&format!(r"\\.\pipe\{pipe_name}"),1163&FramingMode::Message,1164&BlockingMode::Wait,1165/* overlapped= */ true,1166)?;1167Self::create_from_pipe(pipe, false)1168}11691170/// Create server side of MutiPartMessagePipe.1171pub fn create_as_server(pipe_name: &str) -> Result<Self> {1172let pipe = create_server_pipe(1173&format!(r"\\.\pipe\{pipe_name}",),1174&FramingMode::Message,1175&BlockingMode::Wait,11760,11771024 * 1024,1178true,1179)?;1180// SAFETY: `pipe` is a server named pipe.1181unsafe { Self::create_from_server_pipe(pipe) }1182}11831184/// If the struct is created as a server then waits for client connection to arrive.1185/// It only waits on reader as reader and writer are clones.1186pub fn wait_for_connection(&self) -> Result<()> {1187if self.is_server && !self.is_connected.load(Ordering::Relaxed) {1188self.reader.lock().wait_for_client_connection()?;1189self.is_connected.store(true, Ordering::Relaxed);1190}1191Ok(())1192}11931194fn write_overlapped_blocking_message_internal<T: PipeSendable>(1195pipe: &mut PipeConnection,1196buf: &[T],1197overlapped_wrapper: &mut OverlappedWrapper,1198) -> Result<()> {1199// Safety:1200// `buf` and `overlapped_wrapper` will be in use for the duration of1201// the overlapped operation. These must not be reused and must live until1202// after `get_overlapped_result()` has been called which is done right1203// after this call.1204unsafe {1205pipe.write_overlapped(buf, overlapped_wrapper)?;1206}12071208let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?;12091210if size_written_in_bytes as usize != buf.len() {1211return Err(std::io::Error::new(1212std::io::ErrorKind::UnexpectedEof,1213format!(1214"Short write expected:{} found:{}",1215size_written_in_bytes,1216buf.len(),1217),1218));1219}1220Ok(())1221}1222/// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered1223pub fn write_overlapped_blocking_message<T: PipeSendable>(1224&self,1225header: &[T],1226message: &[T],1227overlapped_wrapper: &mut OverlappedWrapper,1228) -> Result<()> {1229let mut writer = self.writer.lock();1230Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?;1231Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper)1232}12331234/// Reads a variable size message and returns the message on success.1235/// The size of the message is expected to proceed the message in1236/// the form of `header_size` message.1237///1238/// `parse_message_size` lets caller parse the header to extract1239/// message size.1240///1241/// Event on `exit_event` is used to interrupt the blocked read.1242pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(1243&self,1244header_size: usize,1245parse_message_size: F,1246overlapped_wrapper: &mut OverlappedWrapper,1247exit_event: &Event,1248) -> Result<Vec<u8>> {1249let mut pipe = self.reader.lock();1250let mut header = vec![0; header_size];1251header.resize_with(header_size, Default::default);1252pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;1253let message_size = parse_message_size(&header);1254if message_size == 0 {1255return Ok(vec![]);1256}1257let mut buf = vec![];1258buf.resize_with(message_size, Default::default);1259pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;1260Ok(buf)1261}12621263/// Returns the inner named pipe if the current struct is the sole owner of the underlying1264/// named pipe.1265///1266/// Otherwise, [`None`] is returned and the struct is dropped.1267///1268/// Note that this has a similar race condition like [`Arc::try_unwrap`]: if multiple threads1269/// call this function simultaneously on the same clone of [`MultiPartMessagePipe`], it is1270/// possible that all of them will result in [`None`]. This is Due to Rust version1271/// restriction(1.68.2) when this function is introduced). This race condition can be resolved1272/// once we upgrade to 1.70.0 or higher by using [`Arc::into_inner`].1273///1274/// If the underlying named pipe is a server named pipe, this method allows the caller to1275/// terminate the connection by first flushing the named pipe then disconnecting the clients1276/// idiomatically per1277/// https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipe-operations#:~:text=When%20a%20client,of%20the%20pipe.1278pub fn into_inner_pipe(self) -> Option<PipeConnection> {1279let piper = Arc::clone(&self.reader);1280drop(self);1281Arc::try_unwrap(piper).ok().map(Mutex::into_inner)1282}1283}12841285impl TryFrom<PipeConnection> for MultiPartMessagePipe {1286type Error = std::io::Error;1287fn try_from(pipe: PipeConnection) -> Result<Self> {1288Self::create_from_pipe(pipe, false)1289}1290}12911292#[cfg(test)]1293mod tests {1294use std::mem::size_of;1295use std::thread::JoinHandle;1296use std::time::Duration;12971298use super::*;12991300#[test]1301fn duplex_pipe_stream() {1302let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();13031304// Test both forward and reverse direction since the underlying APIs are a bit asymmetrical1305// SAFETY: trivially safe with pipe created and return value checked.1306unsafe {1307for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {1308println!("{dir}");13091310sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();13111312// Smaller than what we sent so we get multiple chunks1313let mut recv_buffer: [u8; 4] = [0; 4];13141315let mut size = receiver.read(&mut recv_buffer).unwrap();1316assert_eq!(size, 4);1317assert_eq!(recv_buffer, [75, 77, 54, 82]);13181319size = receiver.read(&mut recv_buffer).unwrap();1320assert_eq!(size, 2);1321assert_eq!(recv_buffer[0..2], [76, 65]);1322}1323}1324}13251326#[test]1327fn available_byte_count_byte_mode() {1328let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();1329p1.write(&[1, 23, 45]).unwrap();1330assert_eq!(p2.get_available_byte_count().unwrap(), 3);13311332// PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should1333// yield the same value.1334assert_eq!(p2.get_available_byte_count().unwrap(), 3);1335}13361337#[test]1338fn available_byte_count_message_mode() {1339let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();1340p1.write(&[1, 23, 45]).unwrap();1341assert_eq!(p2.get_available_byte_count().unwrap(), 3);13421343// PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should1344// yield the same value.1345assert_eq!(p2.get_available_byte_count().unwrap(), 3);1346}13471348#[test]1349fn available_byte_count_message_mode_multiple_messages() {1350let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();1351p1.write(&[1, 2, 3]).unwrap();1352p1.write(&[4, 5]).unwrap();1353assert_eq!(p2.get_available_byte_count().unwrap(), 5);1354}13551356#[test]1357fn duplex_pipe_message() {1358let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();13591360// Test both forward and reverse direction since the underlying APIs are a bit asymmetrical1361// SAFETY: trivially safe with pipe created and return value checked.1362unsafe {1363for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {1364println!("{dir}");13651366// Send 2 messages so that we can check that message framing works1367sender.write(&[1, 23, 45]).unwrap();1368sender.write(&[67, 89, 10]).unwrap();13691370let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages13711372let mut size = receiver.read(&mut recv_buffer).unwrap();1373assert_eq!(size, 3);1374assert_eq!(recv_buffer[0..3], [1, 23, 45]);13751376size = receiver.read(&mut recv_buffer).unwrap();1377assert_eq!(size, 3);1378assert_eq!(recv_buffer[0..3], [67, 89, 10]);1379}1380}1381}13821383#[cfg(test)]1384fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {1385let mut recv_buffer: [u8; 1] = [0; 1];13861387// Test both forward and reverse direction since the underlying APIs are a bit asymmetrical1388// SAFETY: trivially safe with PipeConnection created and return value checked.1389unsafe {1390for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {1391println!("{dir}");1392sender.write(&[1]).unwrap();1393assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!1394assert_eq!(1395receiver.read(&mut recv_buffer).unwrap_err().kind(),1396std::io::ErrorKind::WouldBlock1397);1398}1399}1400}14011402#[test]1403fn duplex_nowait() {1404let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();1405duplex_nowait_helper(&p1, &p2);1406}14071408#[test]1409fn duplex_nowait_set_after_creation() {1410// Tests non blocking setting after pipe creation1411let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();1412p1.set_blocking(&BlockingMode::NoWait)1413.expect("Failed to set blocking mode on pipe p1");1414p2.set_blocking(&BlockingMode::NoWait)1415.expect("Failed to set blocking mode on pipe p2");1416duplex_nowait_helper(&p1, &p2);1417}14181419#[test]1420fn duplex_overlapped() {1421let pipe_name = generate_pipe_name();14221423let mut p1 = create_server_pipe(1424&pipe_name,1425&FramingMode::Message,1426&BlockingMode::Wait,1427/* timeout= */ 0,1428/* buffer_size= */ 1000,1429/* overlapped= */ true,1430)1431.unwrap();14321433let mut p2 = create_client_pipe(1434&pipe_name,1435&FramingMode::Message,1436&BlockingMode::Wait,1437/* overlapped= */ true,1438)1439.unwrap();14401441// SAFETY:1442// Safe because `read_overlapped` can be called since overlapped struct is created.1443unsafe {1444let mut p1_overlapped_wrapper =1445OverlappedWrapper::new(/* include_event= */ true).unwrap();1446p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)1447.unwrap();1448let size = p11449.get_overlapped_result(&mut p1_overlapped_wrapper)1450.unwrap();1451assert_eq!(size, 6);14521453let mut recv_buffer: [u8; 6] = [0; 6];14541455let mut p2_overlapped_wrapper =1456OverlappedWrapper::new(/* include_event= */ true).unwrap();1457p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)1458.unwrap();1459let size = p21460.get_overlapped_result(&mut p2_overlapped_wrapper)1461.unwrap();1462assert_eq!(size, 6);1463assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);1464}1465}14661467#[test]1468fn duplex_overlapped_test_in_use() {1469let pipe_name = generate_pipe_name();14701471let mut p1 = create_server_pipe(1472&pipe_name,1473&FramingMode::Message,1474&BlockingMode::Wait,1475/* timeout= */ 0,1476/* buffer_size= */ 1000,1477/* overlapped= */ true,1478)1479.unwrap();14801481let mut p2 = create_client_pipe(1482&pipe_name,1483&FramingMode::Message,1484&BlockingMode::Wait,1485/* overlapped= */ true,1486)1487.unwrap();1488let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();14891490let res = p1.get_overlapped_result(&mut overlapped_wrapper);1491assert!(res.is_err());14921493let data = vec![75, 77, 54, 82, 76, 65];1494// SAFETY: safe because: data & overlapped wrapper live until the1495// operation is verified completed below.1496let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };1497assert!(res.is_ok());14981499let res =1500// SAFETY: safe because we know the unsafe re-use of overlapped wrapper1501// will error out.1502unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };1503assert!(res.is_err());15041505let mut recv_buffer: [u8; 6] = [0; 6];1506// SAFETY: safe because we know the unsafe re-use of overlapped wrapper1507// will error out.1508let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };1509assert!(res.is_err());15101511let res = p1.get_overlapped_result(&mut overlapped_wrapper);1512assert!(res.is_ok());15131514let mut recv_buffer: [u8; 6] = [0; 6];1515// SAFETY: safe because recv_buffer & overlapped_wrapper live until the1516// operation is verified completed below.1517let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };1518assert!(res.is_ok());1519let res = p2.get_overlapped_result(&mut overlapped_wrapper);1520assert!(res.is_ok());1521}15221523fn generate_pipe_name() -> String {1524format!(r"\\.\pipe\test-ipc-pipe-name.rand{}", rand::random::<u64>())1525}15261527fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> {1528let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"];1529std::thread::spawn(move || {1530let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();1531let exit_event = Event::new().unwrap();1532for _i in 0..msg_count {1533let message = messages[rand::random_range(..messages.len())];1534pipe.write_overlapped_blocking_message(1535&message.len().to_be_bytes(),1536message.as_bytes(),1537&mut overlapped_wrapper,1538)1539.unwrap();1540}1541for _i in 0..msg_count {1542let message = pipe1543.read_overlapped_blocking_message(1544size_of::<usize>(),1545|bytes: &[u8]| {1546assert_eq!(bytes.len(), size_of::<usize>());1547usize::from_be_bytes(1548bytes.try_into().expect("failed to get array from slice"),1549)1550},1551&mut overlapped_wrapper,1552&exit_event,1553)1554.unwrap();1555assert_eq!(1556*messages.get(message.len() - 1).unwrap(),1557std::str::from_utf8(&message).unwrap(),1558);1559}1560})1561}15621563#[test]1564fn multipart_message_smoke_test() {1565let pipe_name = generate_pipe_name();1566let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap();1567let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap();1568let handles = [1569send_receive_msgs(server.clone(), 100),1570send_receive_msgs(client.clone(), 100),1571send_receive_msgs(server, 100),1572send_receive_msgs(client, 100),1573];1574for h in handles {1575h.join().unwrap();1576}1577}15781579#[test]1580fn multipart_message_into_inner_pipe() {1581let pipe_name = generate_pipe_name();1582let mut pipe = create_server_pipe(1583&format!(r"\\.\pipe\{pipe_name}"),1584&FramingMode::Message,1585&BlockingMode::Wait,15860,15871024 * 1024,1588true,1589)1590.expect("should create the server pipe with success");1591let server1 = {1592let pipe = pipe1593.try_clone()1594.expect("should duplicate the named pipe with success");1595// SAFETY: `pipe` is a server named pipe.1596unsafe { MultiPartMessagePipe::create_from_server_pipe(pipe) }1597.expect("should create the multipart message pipe with success")1598};1599let server2 = server1.clone();1600assert!(1601server2.into_inner_pipe().is_none(),1602"not the last reference, should be None"1603);1604let inner_pipe = server11605.into_inner_pipe()1606.expect("the last reference, should return the underlying pipe");1607// CompareObjectHandles is a Windows 10 API and is not available in mingw, so we can't use1608// that API to compare if 2 handles are the same.1609pipe.set_blocking(&BlockingMode::NoWait)1610.expect("should set the blocking mode on the original pipe with success");1611assert_eq!(1612pipe.get_info()1613.expect("should get the pipe information on the original pipe successfully"),1614inner_pipe1615.get_info()1616.expect("should get the pipe information on the inner pipe successfully")1617);1618pipe.set_blocking(&BlockingMode::Wait)1619.expect("should set the blocking mode on the original pipe with success");1620assert_eq!(1621pipe.get_info()1622.expect("should get the pipe information on the original pipe successfully"),1623inner_pipe1624.get_info()1625.expect("should get the pipe information on the inner pipe successfully")1626);1627}16281629#[test]1630fn test_wait_for_connection_blocking() {1631let pipe_name = generate_pipe_name();16321633let mut server_pipe = create_server_pipe(1634&pipe_name,1635&FramingMode::Message,1636&BlockingMode::Wait,1637/* timeout= */ 0,1638/* buffer_size= */ 1000,1639/* overlapped= */ true,1640)1641.unwrap();16421643let server = crate::thread::spawn_with_timeout(move || {1644let exit_event = Event::new().unwrap();1645server_pipe1646.wait_for_client_connection_overlapped_blocking(&exit_event)1647.unwrap();1648});16491650let _client = create_client_pipe(1651&pipe_name,1652&FramingMode::Message,1653&BlockingMode::Wait,1654/* overlapped= */ true,1655)1656.unwrap();1657server.try_join(Duration::from_secs(10)).unwrap();1658}16591660#[test]1661fn test_wait_for_connection_blocking_exit_triggered() {1662let pipe_name = generate_pipe_name();16631664let mut server_pipe = create_server_pipe(1665&pipe_name,1666&FramingMode::Message,1667&BlockingMode::Wait,1668/* timeout= */ 0,1669/* buffer_size= */ 1000,1670/* overlapped= */ true,1671)1672.unwrap();16731674let exit_event = Event::new().unwrap();1675let exit_event_for_server = exit_event.try_clone().unwrap();1676let server = crate::thread::spawn_with_timeout(move || {1677assert!(server_pipe1678.wait_for_client_connection_overlapped_blocking(&exit_event_for_server)1679.is_err());1680});1681exit_event.signal().unwrap();1682server.try_join(Duration::from_secs(10)).unwrap();1683}16841685#[test]1686fn std_io_read_eof() {1687let (mut w, mut r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();1688std::io::Write::write(&mut w, &[1, 2, 3]).unwrap();1689std::mem::drop(w);16901691let mut buffer: [u8; 4] = [0; 4];1692assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 3);1693assert_eq!(buffer, [1, 2, 3, 0]);1694assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);1695assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);1696}16971698#[test]1699fn std_io_write_eof() {1700let (mut w, r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();1701std::mem::drop(r);1702let result = std::io::Write::write(&mut w, &[1, 2, 3]);1703// Not required to return BrokenPipe here, something like Ok(0) is also acceptable.1704assert!(1705result.is_err()1706&& result.as_ref().unwrap_err().kind() == std::io::ErrorKind::BrokenPipe,1707"expected Err(BrokenPipe), got {result:?}"1708);1709}1710}171117121713