Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/async_pipe.rs
3309 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
use crate::{constants::APPLICATION_NAME, util::errors::CodeError};
7
use async_trait::async_trait;
8
use std::path::{Path, PathBuf};
9
use std::pin::Pin;
10
use std::task::{Context, Poll};
11
use tokio::io::{AsyncRead, AsyncWrite};
12
use tokio::net::TcpListener;
13
use uuid::Uuid;
14
15
// todo: we could probably abstract this into some crate, if one doesn't already exist
16
17
cfg_if::cfg_if! {
18
if #[cfg(unix)] {
19
pub type AsyncPipe = tokio::net::UnixStream;
20
pub type AsyncPipeWriteHalf = tokio::net::unix::OwnedWriteHalf;
21
pub type AsyncPipeReadHalf = tokio::net::unix::OwnedReadHalf;
22
23
pub async fn get_socket_rw_stream(path: &Path) -> Result<AsyncPipe, CodeError> {
24
tokio::net::UnixStream::connect(path)
25
.await
26
.map_err(CodeError::AsyncPipeFailed)
27
}
28
29
pub async fn listen_socket_rw_stream(path: &Path) -> Result<AsyncPipeListener, CodeError> {
30
tokio::net::UnixListener::bind(path)
31
.map(AsyncPipeListener)
32
.map_err(CodeError::AsyncPipeListenerFailed)
33
}
34
35
pub struct AsyncPipeListener(tokio::net::UnixListener);
36
37
impl AsyncPipeListener {
38
pub async fn accept(&mut self) -> Result<AsyncPipe, CodeError> {
39
self.0.accept().await.map_err(CodeError::AsyncPipeListenerFailed).map(|(s, _)| s)
40
}
41
}
42
43
pub fn socket_stream_split(pipe: AsyncPipe) -> (AsyncPipeReadHalf, AsyncPipeWriteHalf) {
44
pipe.into_split()
45
}
46
} else {
47
use tokio::{time::sleep, io::ReadBuf};
48
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions, NamedPipeClient, NamedPipeServer};
49
use std::{time::Duration, io};
50
use pin_project::pin_project;
51
52
#[pin_project(project = AsyncPipeProj)]
53
pub enum AsyncPipe {
54
PipeClient(#[pin] NamedPipeClient),
55
PipeServer(#[pin] NamedPipeServer),
56
}
57
58
impl AsyncRead for AsyncPipe {
59
fn poll_read(
60
self: Pin<&mut Self>,
61
cx: &mut Context<'_>,
62
buf: &mut ReadBuf<'_>,
63
) -> Poll<io::Result<()>> {
64
match self.project() {
65
AsyncPipeProj::PipeClient(c) => c.poll_read(cx, buf),
66
AsyncPipeProj::PipeServer(c) => c.poll_read(cx, buf),
67
}
68
}
69
}
70
71
impl AsyncWrite for AsyncPipe {
72
fn poll_write(
73
self: Pin<&mut Self>,
74
cx: &mut Context<'_>,
75
buf: &[u8],
76
) -> Poll<io::Result<usize>> {
77
match self.project() {
78
AsyncPipeProj::PipeClient(c) => c.poll_write(cx, buf),
79
AsyncPipeProj::PipeServer(c) => c.poll_write(cx, buf),
80
}
81
}
82
83
fn poll_write_vectored(
84
self: Pin<&mut Self>,
85
cx: &mut Context<'_>,
86
bufs: &[io::IoSlice<'_>],
87
) -> Poll<Result<usize, io::Error>> {
88
match self.project() {
89
AsyncPipeProj::PipeClient(c) => c.poll_write_vectored(cx, bufs),
90
AsyncPipeProj::PipeServer(c) => c.poll_write_vectored(cx, bufs),
91
}
92
}
93
94
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
95
match self.project() {
96
AsyncPipeProj::PipeClient(c) => c.poll_flush(cx),
97
AsyncPipeProj::PipeServer(c) => c.poll_flush(cx),
98
}
99
}
100
101
fn is_write_vectored(&self) -> bool {
102
match self {
103
AsyncPipe::PipeClient(c) => c.is_write_vectored(),
104
AsyncPipe::PipeServer(c) => c.is_write_vectored(),
105
}
106
}
107
108
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
109
match self.project() {
110
AsyncPipeProj::PipeClient(c) => c.poll_shutdown(cx),
111
AsyncPipeProj::PipeServer(c) => c.poll_shutdown(cx),
112
}
113
}
114
}
115
116
pub type AsyncPipeWriteHalf = tokio::io::WriteHalf<AsyncPipe>;
117
pub type AsyncPipeReadHalf = tokio::io::ReadHalf<AsyncPipe>;
118
119
pub async fn get_socket_rw_stream(path: &Path) -> Result<AsyncPipe, CodeError> {
120
// Tokio says we can need to try in a loop. Do so.
121
// https://docs.rs/tokio/latest/tokio/net/windows/named_pipe/struct.NamedPipeClient.html
122
let client = loop {
123
match ClientOptions::new().open(path) {
124
Ok(client) => break client,
125
// ERROR_PIPE_BUSY https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
126
Err(e) if e.raw_os_error() == Some(231) => sleep(Duration::from_millis(100)).await,
127
Err(e) => return Err(CodeError::AsyncPipeFailed(e)),
128
}
129
};
130
131
Ok(AsyncPipe::PipeClient(client))
132
}
133
134
pub struct AsyncPipeListener {
135
path: PathBuf,
136
server: NamedPipeServer
137
}
138
139
impl AsyncPipeListener {
140
pub async fn accept(&mut self) -> Result<AsyncPipe, CodeError> {
141
// see https://docs.rs/tokio/latest/tokio/net/windows/named_pipe/struct.NamedPipeServer.html
142
// this is a bit weird in that the server becomes the client once
143
// they get a connection, and we create a new client.
144
145
self.server
146
.connect()
147
.await
148
.map_err(CodeError::AsyncPipeListenerFailed)?;
149
150
// Construct the next server to be connected before sending the one
151
// we already have of onto a task. This ensures that the server
152
// isn't closed (after it's done in the task) before a new one is
153
// available. Otherwise the client might error with
154
// `io::ErrorKind::NotFound`.
155
let next_server = ServerOptions::new()
156
.create(&self.path)
157
.map_err(CodeError::AsyncPipeListenerFailed)?;
158
159
160
Ok(AsyncPipe::PipeServer(std::mem::replace(&mut self.server, next_server)))
161
}
162
}
163
164
pub async fn listen_socket_rw_stream(path: &Path) -> Result<AsyncPipeListener, CodeError> {
165
let server = ServerOptions::new()
166
.first_pipe_instance(true)
167
.create(path)
168
.map_err(CodeError::AsyncPipeListenerFailed)?;
169
170
Ok(AsyncPipeListener { path: path.to_owned(), server })
171
}
172
173
pub fn socket_stream_split(pipe: AsyncPipe) -> (AsyncPipeReadHalf, AsyncPipeWriteHalf) {
174
tokio::io::split(pipe)
175
}
176
}
177
}
178
179
impl AsyncPipeListener {
180
pub fn into_pollable(self) -> PollableAsyncListener {
181
PollableAsyncListener {
182
listener: Some(self),
183
write_fut: tokio_util::sync::ReusableBoxFuture::new(make_accept_fut(None)),
184
}
185
}
186
}
187
188
pub struct PollableAsyncListener {
189
listener: Option<AsyncPipeListener>,
190
write_fut: tokio_util::sync::ReusableBoxFuture<
191
'static,
192
(AsyncPipeListener, Result<AsyncPipe, CodeError>),
193
>,
194
}
195
196
async fn make_accept_fut(
197
data: Option<AsyncPipeListener>,
198
) -> (AsyncPipeListener, Result<AsyncPipe, CodeError>) {
199
match data {
200
Some(mut l) => {
201
let c = l.accept().await;
202
(l, c)
203
}
204
None => unreachable!("this future should not be pollable in this state"),
205
}
206
}
207
208
impl hyper::server::accept::Accept for PollableAsyncListener {
209
type Conn = AsyncPipe;
210
type Error = CodeError;
211
212
fn poll_accept(
213
mut self: Pin<&mut Self>,
214
cx: &mut Context<'_>,
215
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
216
if let Some(l) = self.listener.take() {
217
self.write_fut.set(make_accept_fut(Some(l)))
218
}
219
220
match self.write_fut.poll(cx) {
221
Poll::Ready((l, cnx)) => {
222
self.listener = Some(l);
223
Poll::Ready(Some(cnx))
224
}
225
Poll::Pending => Poll::Pending,
226
}
227
}
228
}
229
230
/// Gets a random name for a pipe/socket on the platform
231
pub fn get_socket_name() -> PathBuf {
232
cfg_if::cfg_if! {
233
if #[cfg(unix)] {
234
std::env::temp_dir().join(format!("{}-{}", APPLICATION_NAME, Uuid::new_v4()))
235
} else {
236
PathBuf::from(format!(r"\\.\pipe\{}-{}", APPLICATION_NAME, Uuid::new_v4()))
237
}
238
}
239
}
240
241
pub type AcceptedRW = (
242
Box<dyn AsyncRead + Send + Unpin>,
243
Box<dyn AsyncWrite + Send + Unpin>,
244
);
245
246
#[async_trait]
247
pub trait AsyncRWAccepter {
248
async fn accept_rw(&mut self) -> Result<AcceptedRW, CodeError>;
249
}
250
251
#[async_trait]
252
impl AsyncRWAccepter for AsyncPipeListener {
253
async fn accept_rw(&mut self) -> Result<AcceptedRW, CodeError> {
254
let pipe = self.accept().await?;
255
let (read, write) = socket_stream_split(pipe);
256
Ok((Box::new(read), Box::new(write)))
257
}
258
}
259
260
#[async_trait]
261
impl AsyncRWAccepter for TcpListener {
262
async fn accept_rw(&mut self) -> Result<AcceptedRW, CodeError> {
263
let (stream, _) = self
264
.accept()
265
.await
266
.map_err(CodeError::AsyncPipeListenerFailed)?;
267
let (read, write) = tokio::io::split(stream);
268
Ok((Box::new(read), Box::new(write)))
269
}
270
}
271
272