Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/tunnels/singleton_client.rs
3314 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
use std::{
7
path::Path,
8
sync::{
9
atomic::{AtomicBool, Ordering},
10
Arc,
11
},
12
thread,
13
};
14
15
use const_format::concatcp;
16
use tokio::sync::mpsc;
17
18
use crate::{
19
async_pipe::{socket_stream_split, AsyncPipe},
20
constants::IS_INTERACTIVE_CLI,
21
json_rpc::{new_json_rpc, start_json_rpc, JsonRpcSerializer},
22
log,
23
rpc::RpcCaller,
24
singleton::connect_as_client,
25
tunnels::{code_server::print_listening, protocol::EmptyObject},
26
util::{errors::CodeError, sync::Barrier},
27
};
28
29
use super::{
30
protocol,
31
shutdown_signal::{ShutdownRequest, ShutdownSignal},
32
};
33
34
pub struct SingletonClientArgs {
35
pub log: log::Logger,
36
pub stream: AsyncPipe,
37
pub shutdown: Barrier<ShutdownSignal>,
38
}
39
40
struct SingletonServerContext {
41
log: log::Logger,
42
exit_entirely: Arc<AtomicBool>,
43
caller: RpcCaller<JsonRpcSerializer>,
44
}
45
46
const CONTROL_INSTRUCTIONS_COMMON: &str =
47
"Connected to an existing tunnel process running on this machine.";
48
49
const CONTROL_INSTRUCTIONS_INTERACTIVE: &str = concatcp!(
50
CONTROL_INSTRUCTIONS_COMMON,
51
" You can press:
52
53
- \"x\" + Enter to stop the tunnel and exit
54
- \"r\" + Enter to restart the tunnel
55
- Ctrl+C to detach
56
"
57
);
58
59
/// Serves a client singleton. Returns true if the process should exit after
60
/// this returns, instead of trying to start a tunnel.
61
pub async fn start_singleton_client(args: SingletonClientArgs) -> bool {
62
let mut rpc = new_json_rpc();
63
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
64
let exit_entirely = Arc::new(AtomicBool::new(false));
65
66
debug!(
67
args.log,
68
"An existing tunnel is running on this machine, connecting to it..."
69
);
70
71
if *IS_INTERACTIVE_CLI {
72
let stdin_handle = rpc.get_caller(msg_tx.clone());
73
thread::spawn(move || {
74
let mut input = String::new();
75
loop {
76
input.truncate(0);
77
match std::io::stdin().read_line(&mut input) {
78
Err(_) | Ok(0) => return, // EOF or not a tty
79
_ => {}
80
};
81
82
match input.chars().next().map(|c| c.to_ascii_lowercase()) {
83
Some('x') => {
84
stdin_handle.notify(protocol::singleton::METHOD_SHUTDOWN, EmptyObject {});
85
return;
86
}
87
Some('r') => {
88
stdin_handle.notify(protocol::singleton::METHOD_RESTART, EmptyObject {});
89
}
90
Some(_) | None => {}
91
}
92
}
93
});
94
}
95
96
let caller = rpc.get_caller(msg_tx);
97
let mut rpc = rpc.methods(SingletonServerContext {
98
log: args.log.clone(),
99
exit_entirely: exit_entirely.clone(),
100
caller,
101
});
102
103
rpc.register_sync(protocol::singleton::METHOD_SHUTDOWN, |_: EmptyObject, c| {
104
c.exit_entirely.store(true, Ordering::SeqCst);
105
Ok(())
106
});
107
108
rpc.register_async(
109
protocol::singleton::METHOD_LOG_REPLY_DONE,
110
|_: EmptyObject, c| async move {
111
c.log.result(if *IS_INTERACTIVE_CLI {
112
CONTROL_INSTRUCTIONS_INTERACTIVE
113
} else {
114
CONTROL_INSTRUCTIONS_COMMON
115
});
116
117
let res = c
118
.caller
119
.call::<_, _, protocol::singleton::StatusWithTunnelName>(
120
protocol::singleton::METHOD_STATUS,
121
protocol::EmptyObject {},
122
);
123
124
// we want to ensure the "listening" string always gets printed for
125
// consumers (i.e. VS Code). Ask for it. If the tunnel is not currently
126
// connected though, it will be soon, and that'll be in the log replays.
127
if let Ok(Ok(s)) = res.await {
128
if let Some(name) = s.name {
129
print_listening(&c.log, &name);
130
}
131
}
132
133
Ok(())
134
},
135
);
136
137
rpc.register_sync(
138
protocol::singleton::METHOD_LOG,
139
|log: protocol::singleton::LogMessageOwned, c| {
140
match log.level {
141
Some(level) => c.log.emit(level, &format!("{}{}", log.prefix, log.message)),
142
None => c.log.result(format!("{}{}", log.prefix, log.message)),
143
}
144
Ok(())
145
},
146
);
147
148
let (read, write) = socket_stream_split(args.stream);
149
let _ = start_json_rpc(rpc.build(args.log), read, write, msg_rx, args.shutdown).await;
150
151
exit_entirely.load(Ordering::SeqCst)
152
}
153
154
pub async fn do_single_rpc_call<
155
P: serde::Serialize + 'static,
156
R: serde::de::DeserializeOwned + Send + 'static,
157
>(
158
lock_file: &Path,
159
log: log::Logger,
160
method: &'static str,
161
params: P,
162
) -> Result<R, CodeError> {
163
let client = match connect_as_client(lock_file).await {
164
Ok(p) => p,
165
Err(CodeError::SingletonLockfileOpenFailed(_))
166
| Err(CodeError::SingletonLockedProcessExited(_)) => {
167
return Err(CodeError::NoRunningTunnel);
168
}
169
Err(e) => return Err(e),
170
};
171
172
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
173
let mut rpc = new_json_rpc();
174
let caller = rpc.get_caller(msg_tx);
175
let (read, write) = socket_stream_split(client);
176
177
let rpc = tokio::spawn(async move {
178
start_json_rpc(
179
rpc.methods(()).build(log),
180
read,
181
write,
182
msg_rx,
183
ShutdownRequest::create_rx([ShutdownRequest::CtrlC]),
184
)
185
.await
186
.unwrap();
187
});
188
189
let r = caller.call(method, params).await.unwrap();
190
rpc.abort();
191
r.map_err(CodeError::TunnelRpcCallFailed)
192
}
193
194