Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/commands/tunnels.rs
3316 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
use async_trait::async_trait;
7
use base64::{engine::general_purpose as b64, Engine as _};
8
use futures::{stream::FuturesUnordered, StreamExt};
9
use serde::Serialize;
10
use sha2::{Digest, Sha256};
11
use std::{
12
net::{IpAddr, Ipv4Addr, SocketAddr},
13
str::FromStr,
14
time::Duration,
15
};
16
use sysinfo::Pid;
17
use tokio::{
18
io::{AsyncBufReadExt, BufReader},
19
sync::watch,
20
};
21
22
use super::{
23
args::{
24
AuthProvider, CliCore, CommandShellArgs, ExistingTunnelArgs, TunnelArgs, TunnelForwardArgs,
25
TunnelRenameArgs, TunnelServeArgs, TunnelServiceSubCommands, TunnelUserSubCommands,
26
},
27
CommandContext,
28
};
29
30
use crate::{
31
async_pipe::{get_socket_name, listen_socket_rw_stream, AsyncRWAccepter},
32
auth::Auth,
33
constants::{
34
APPLICATION_NAME, CONTROL_PORT, IS_A_TTY, TUNNEL_CLI_LOCK_NAME, TUNNEL_SERVICE_LOCK_NAME,
35
},
36
log,
37
state::LauncherPaths,
38
tunnels::{
39
code_server::CodeServerArgs,
40
create_service_manager,
41
dev_tunnels::{self, DevTunnels},
42
legal, local_forwarding,
43
paths::get_all_servers,
44
protocol, serve_stream,
45
shutdown_signal::ShutdownRequest,
46
singleton_client::do_single_rpc_call,
47
singleton_server::{
48
make_singleton_server, start_singleton_server, BroadcastLogSink, SingletonServerArgs,
49
},
50
AuthRequired, Next, ServeStreamParams, ServiceContainer, ServiceManager,
51
},
52
util::{
53
app_lock::AppMutex,
54
command::new_std_command,
55
errors::{wrap, AnyError, CodeError},
56
machine::canonical_exe,
57
prereqs::PreReqChecker,
58
},
59
};
60
use crate::{
61
singleton::{acquire_singleton, SingletonConnection},
62
tunnels::{
63
dev_tunnels::ActiveTunnel,
64
singleton_client::{start_singleton_client, SingletonClientArgs},
65
SleepInhibitor,
66
},
67
};
68
69
impl From<AuthProvider> for crate::auth::AuthProvider {
70
fn from(auth_provider: AuthProvider) -> Self {
71
match auth_provider {
72
AuthProvider::Github => crate::auth::AuthProvider::Github,
73
AuthProvider::Microsoft => crate::auth::AuthProvider::Microsoft,
74
}
75
}
76
}
77
78
fn fulfill_existing_tunnel_args(
79
d: ExistingTunnelArgs,
80
name_arg: &Option<String>,
81
) -> Option<dev_tunnels::ExistingTunnel> {
82
let tunnel_name = d.tunnel_name.or_else(|| name_arg.clone());
83
84
match (d.tunnel_id, d.cluster, d.host_token) {
85
(Some(tunnel_id), None, Some(host_token)) => {
86
let i = tunnel_id.find('.')?;
87
Some(dev_tunnels::ExistingTunnel {
88
tunnel_id: tunnel_id[..i].to_string(),
89
cluster: tunnel_id[i + 1..].to_string(),
90
tunnel_name,
91
host_token,
92
})
93
}
94
95
(Some(tunnel_id), Some(cluster), Some(host_token)) => Some(dev_tunnels::ExistingTunnel {
96
tunnel_id,
97
tunnel_name,
98
host_token,
99
cluster,
100
}),
101
102
_ => None,
103
}
104
}
105
106
struct TunnelServiceContainer {
107
core_args: CliCore,
108
tunnel_args: TunnelArgs,
109
}
110
111
impl TunnelServiceContainer {
112
fn new(core_args: CliCore, tunnel_args: TunnelArgs) -> Self {
113
Self {
114
core_args,
115
tunnel_args,
116
}
117
}
118
}
119
120
#[async_trait]
121
impl ServiceContainer for TunnelServiceContainer {
122
async fn run_service(
123
&mut self,
124
log: log::Logger,
125
launcher_paths: LauncherPaths,
126
) -> Result<(), AnyError> {
127
let mut csa = (&self.core_args).into();
128
self.tunnel_args.serve_args.server_args.apply_to(&mut csa);
129
serve_with_csa(
130
launcher_paths,
131
log,
132
TunnelServeArgs {
133
random_name: true, // avoid prompting
134
..Default::default()
135
},
136
csa,
137
TUNNEL_SERVICE_LOCK_NAME,
138
)
139
.await?;
140
Ok(())
141
}
142
}
143
144
pub async fn command_shell(ctx: CommandContext, args: CommandShellArgs) -> Result<i32, AnyError> {
145
let platform = PreReqChecker::new().verify().await?;
146
let mut shutdown_reqs = vec![ShutdownRequest::CtrlC];
147
if let Some(p) = args.parent_process_id.and_then(|p| Pid::from_str(&p).ok()) {
148
shutdown_reqs.push(ShutdownRequest::ParentProcessKilled(p));
149
}
150
151
let mut params = ServeStreamParams {
152
log: ctx.log,
153
launcher_paths: ctx.paths,
154
platform,
155
requires_auth: args
156
.require_token
157
.map(AuthRequired::VSDAWithToken)
158
.unwrap_or(AuthRequired::VSDA),
159
exit_barrier: ShutdownRequest::create_rx(shutdown_reqs),
160
code_server_args: (&ctx.args).into(),
161
};
162
163
args.server_args.apply_to(&mut params.code_server_args);
164
165
let mut listener: Box<dyn AsyncRWAccepter> =
166
match (args.on_port.first(), &args.on_host, args.on_socket) {
167
(_, _, true) => {
168
let socket = get_socket_name();
169
let listener = listen_socket_rw_stream(&socket)
170
.await
171
.map_err(|e| wrap(e, "error listening on socket"))?;
172
173
params
174
.log
175
.result(format!("Listening on {}", socket.display()));
176
177
Box::new(listener)
178
}
179
(Some(_), _, _) | (_, Some(_), _) => {
180
let host = args
181
.on_host
182
.as_ref()
183
.map(|h| h.parse().map_err(CodeError::InvalidHostAddress))
184
.unwrap_or(Ok(IpAddr::V4(Ipv4Addr::LOCALHOST)))?;
185
186
let lower_port = args.on_port.first().copied().unwrap_or_default();
187
let port_no = if let Some(upper) = args.on_port.get(1) {
188
find_unused_port(&host, lower_port, *upper)
189
.await
190
.unwrap_or_default()
191
} else {
192
lower_port
193
};
194
195
let addr = SocketAddr::new(host, port_no);
196
let listener = tokio::net::TcpListener::bind(addr)
197
.await
198
.map_err(|e| wrap(e, "error listening on port"))?;
199
200
params
201
.log
202
.result(format!("Listening on {}", listener.local_addr().unwrap()));
203
204
Box::new(listener)
205
}
206
_ => {
207
serve_stream(tokio::io::stdin(), tokio::io::stderr(), params).await;
208
return Ok(0);
209
}
210
};
211
212
let mut servers = FuturesUnordered::new();
213
214
loop {
215
tokio::select! {
216
Some(_) = servers.next() => {},
217
socket = listener.accept_rw() => {
218
match socket {
219
Ok((read, write)) => servers.push(serve_stream(read, write, params.clone())),
220
Err(e) => {
221
error!(params.log, &format!("Error accepting connection: {e}"));
222
return Ok(1);
223
}
224
}
225
},
226
_ = params.exit_barrier.wait() => {
227
// wait for all servers to finish up:
228
while (servers.next().await).is_some() { }
229
return Ok(0);
230
}
231
}
232
}
233
}
234
235
async fn find_unused_port(host: &IpAddr, start_port: u16, end_port: u16) -> Option<u16> {
236
for port in start_port..=end_port {
237
if is_port_available(*host, port).await {
238
return Some(port);
239
}
240
}
241
None
242
}
243
244
async fn is_port_available(host: IpAddr, port: u16) -> bool {
245
tokio::net::TcpListener::bind(SocketAddr::new(host, port))
246
.await
247
.is_ok()
248
}
249
250
fn make_service_args<'a: 'c, 'b: 'c, 'c>(
251
root_path: &'a str,
252
tunnel_args: &'b TunnelArgs,
253
) -> Vec<&'c str> {
254
let mut args = ["--verbose", "--cli-data-dir", root_path, "tunnel"].to_vec();
255
256
if let Some(d) = tunnel_args.serve_args.server_args.extensions_dir.as_ref() {
257
args.extend_from_slice(&["--extensions-dir", d]);
258
}
259
if let Some(d) = tunnel_args.serve_args.server_args.server_data_dir.as_ref() {
260
args.extend_from_slice(&["--server-data-dir", d]);
261
}
262
263
args.extend_from_slice(&["service", "internal-run"]);
264
265
args
266
}
267
268
pub async fn service(
269
ctx: CommandContext,
270
tunnel_args: TunnelArgs,
271
service_args: TunnelServiceSubCommands,
272
) -> Result<i32, AnyError> {
273
let manager = create_service_manager(ctx.log.clone(), &ctx.paths);
274
match service_args {
275
TunnelServiceSubCommands::Install(args) => {
276
let auth = Auth::new(&ctx.paths, ctx.log.clone());
277
278
if let Some(name) = &args.name {
279
// ensure the name matches, and tunnel exists
280
dev_tunnels::DevTunnels::new_remote_tunnel(&ctx.log, auth, &ctx.paths)
281
.rename_tunnel(name)
282
.await?;
283
} else {
284
// still ensure they're logged in, otherwise subsequent serving will fail
285
auth.get_credential().await?;
286
}
287
288
// likewise for license consent
289
legal::require_consent(&ctx.paths, args.accept_server_license_terms)?;
290
291
let current_exe = canonical_exe().map_err(|e| wrap(e, "could not get current exe"))?;
292
let root_path = ctx.paths.root().as_os_str().to_string_lossy();
293
let args = make_service_args(&root_path, &tunnel_args);
294
295
manager.register(current_exe, &args).await?;
296
ctx.log.result(format!("Service successfully installed! You can use `{APPLICATION_NAME} tunnel service log` to monitor it, and `{APPLICATION_NAME} tunnel service uninstall` to remove it."));
297
}
298
TunnelServiceSubCommands::Uninstall => {
299
manager.unregister().await?;
300
}
301
TunnelServiceSubCommands::Log => {
302
manager.show_logs().await?;
303
}
304
TunnelServiceSubCommands::InternalRun => {
305
manager
306
.run(
307
ctx.paths.clone(),
308
TunnelServiceContainer::new(ctx.args, tunnel_args),
309
)
310
.await?;
311
}
312
}
313
314
Ok(0)
315
}
316
317
pub async fn user(ctx: CommandContext, user_args: TunnelUserSubCommands) -> Result<i32, AnyError> {
318
let auth = Auth::new(&ctx.paths, ctx.log.clone());
319
match user_args {
320
TunnelUserSubCommands::Login(mut login_args) => {
321
auth.login(
322
login_args.provider.map(|p| p.into()),
323
login_args.access_token.take(),
324
login_args.refresh_token.take(),
325
)
326
.await?;
327
}
328
TunnelUserSubCommands::Logout => {
329
auth.clear_credentials()?;
330
}
331
TunnelUserSubCommands::Show => {
332
if let Ok(Some(sc)) = auth.get_current_credential() {
333
ctx.log.result(format!("logged in with provider {}", sc.provider));
334
} else {
335
ctx.log.result("not logged in");
336
return Ok(1);
337
}
338
}
339
}
340
341
Ok(0)
342
}
343
344
/// Remove the tunnel used by this tunnel, if any.
345
pub async fn rename(ctx: CommandContext, rename_args: TunnelRenameArgs) -> Result<i32, AnyError> {
346
let auth = Auth::new(&ctx.paths, ctx.log.clone());
347
let mut dt = dev_tunnels::DevTunnels::new_remote_tunnel(&ctx.log, auth, &ctx.paths);
348
dt.rename_tunnel(&rename_args.name).await?;
349
ctx.log.result(format!(
350
"Successfully renamed this tunnel to {}",
351
&rename_args.name
352
));
353
354
Ok(0)
355
}
356
357
/// Remove the tunnel used by this tunnel, if any.
358
pub async fn unregister(ctx: CommandContext) -> Result<i32, AnyError> {
359
let auth = Auth::new(&ctx.paths, ctx.log.clone());
360
let mut dt = dev_tunnels::DevTunnels::new_remote_tunnel(&ctx.log, auth, &ctx.paths);
361
dt.remove_tunnel().await?;
362
Ok(0)
363
}
364
365
pub async fn restart(ctx: CommandContext) -> Result<i32, AnyError> {
366
do_single_rpc_call::<_, ()>(
367
&ctx.paths.tunnel_lockfile(),
368
ctx.log,
369
protocol::singleton::METHOD_RESTART,
370
protocol::EmptyObject {},
371
)
372
.await
373
.map(|_| 0)
374
.map_err(|e| e.into())
375
}
376
377
pub async fn kill(ctx: CommandContext) -> Result<i32, AnyError> {
378
do_single_rpc_call::<_, ()>(
379
&ctx.paths.tunnel_lockfile(),
380
ctx.log,
381
protocol::singleton::METHOD_SHUTDOWN,
382
protocol::EmptyObject {},
383
)
384
.await
385
.map(|_| 0)
386
.map_err(|e| e.into())
387
}
388
389
#[derive(Serialize)]
390
pub struct StatusOutput {
391
pub tunnel: Option<protocol::singleton::StatusWithTunnelName>,
392
pub service_installed: bool,
393
}
394
395
pub async fn status(ctx: CommandContext) -> Result<i32, AnyError> {
396
let tunnel = do_single_rpc_call::<_, protocol::singleton::StatusWithTunnelName>(
397
&ctx.paths.tunnel_lockfile(),
398
ctx.log.clone(),
399
protocol::singleton::METHOD_STATUS,
400
protocol::EmptyObject {},
401
)
402
.await;
403
404
let service_installed = create_service_manager(ctx.log.clone(), &ctx.paths)
405
.is_installed()
406
.await
407
.unwrap_or(false);
408
409
ctx.log.result(
410
serde_json::to_string(&StatusOutput {
411
service_installed,
412
tunnel: match tunnel {
413
Ok(s) => Some(s),
414
Err(CodeError::NoRunningTunnel | CodeError::AsyncPipeFailed(_)) => None,
415
Err(e) => return Err(e.into()),
416
},
417
})
418
.unwrap(),
419
);
420
421
Ok(0)
422
}
423
424
/// Removes unused servers.
425
pub async fn prune(ctx: CommandContext) -> Result<i32, AnyError> {
426
get_all_servers(&ctx.paths)
427
.into_iter()
428
.map(|s| s.server_paths(&ctx.paths))
429
.filter(|s| s.get_running_pid().is_none())
430
.try_for_each(|s| {
431
ctx.log
432
.result(format!("Deleted {}", s.server_dir.display()));
433
s.delete()
434
})
435
.map_err(AnyError::from)?;
436
437
ctx.log.result("Successfully removed all unused servers");
438
439
Ok(0)
440
}
441
442
/// Starts the gateway server.
443
pub async fn serve(ctx: CommandContext, gateway_args: TunnelServeArgs) -> Result<i32, AnyError> {
444
let CommandContext {
445
log, paths, args, ..
446
} = ctx;
447
448
let no_sleep = match gateway_args.no_sleep.then(SleepInhibitor::new) {
449
Some(i) => match i.await {
450
Ok(i) => Some(i),
451
Err(e) => {
452
warning!(log, "Could not inhibit sleep: {}", e);
453
None
454
}
455
},
456
None => None,
457
};
458
459
legal::require_consent(&paths, gateway_args.accept_server_license_terms)?;
460
461
let mut csa = (&args).into();
462
gateway_args.server_args.apply_to(&mut csa);
463
let result = serve_with_csa(paths, log, gateway_args, csa, TUNNEL_CLI_LOCK_NAME).await;
464
drop(no_sleep);
465
466
result
467
}
468
469
/// Internal command used by port forwarding. It reads requests for forwarded ports
470
/// on lines from stdin, as JSON. It uses singleton logic as well (though on
471
/// a different tunnel than the main one used for the control server) so that
472
/// all forward requests on a single machine go through a single hosted tunnel
473
/// process. Without singleton logic, requests could get routed to processes
474
/// that aren't forwarding a given port and then fail.
475
pub async fn forward(
476
ctx: CommandContext,
477
mut forward_args: TunnelForwardArgs,
478
) -> Result<i32, AnyError> {
479
// Spooky: check IS_A_TTY before starting the stdin reader, since IS_A_TTY will
480
// access stdin but a lock will later be held on stdin by the line-reader.
481
if *IS_A_TTY {
482
trace!(ctx.log, "port forwarding is an internal preview feature");
483
}
484
485
// #region stdin reading logic:
486
let (own_ports_tx, own_ports_rx) = watch::channel(vec![]);
487
let ports_process_log = ctx.log.clone();
488
tokio::spawn(async move {
489
let mut lines = BufReader::new(tokio::io::stdin()).lines();
490
while let Ok(Some(line)) = lines.next_line().await {
491
match serde_json::from_str(&line) {
492
Ok(p) => {
493
let _ = own_ports_tx.send(p);
494
}
495
Err(e) => warning!(ports_process_log, "error parsing ports: {}", e),
496
}
497
}
498
});
499
500
// #region singleton acquisition
501
let shutdown = ShutdownRequest::create_rx([ShutdownRequest::CtrlC]);
502
let server = loop {
503
if shutdown.is_open() {
504
return Ok(0);
505
}
506
507
match acquire_singleton(&ctx.paths.forwarding_lockfile()).await {
508
Ok(SingletonConnection::Client(stream)) => {
509
debug!(ctx.log, "starting as client to singleton");
510
let r = local_forwarding::client(local_forwarding::SingletonClientArgs {
511
log: ctx.log.clone(),
512
shutdown: shutdown.clone(),
513
stream,
514
port_requests: own_ports_rx.clone(),
515
})
516
.await;
517
if let Err(e) = r {
518
warning!(ctx.log, "error contacting forwarding singleton: {}", e);
519
}
520
}
521
Ok(SingletonConnection::Singleton(server)) => break server,
522
Err(e) => {
523
warning!(ctx.log, "error access singleton, retrying: {}", e);
524
tokio::time::sleep(Duration::from_secs(2)).await
525
}
526
}
527
};
528
529
// #region singleton handler
530
let auth = Auth::new(&ctx.paths, ctx.log.clone());
531
if let (Some(p), Some(at)) = (
532
forward_args.login.provider.take(),
533
forward_args.login.access_token.take(),
534
) {
535
auth.login(
536
Some(p.into()),
537
Some(at),
538
forward_args.login.refresh_token.take(),
539
)
540
.await?;
541
}
542
543
let mut tunnels = DevTunnels::new_port_forwarding(&ctx.log, auth, &ctx.paths);
544
let tunnel = tunnels
545
.start_new_launcher_tunnel(None, true, &forward_args.ports)
546
.await?;
547
548
local_forwarding::server(ctx.log, tunnel, server, own_ports_rx, shutdown).await?;
549
550
Ok(0)
551
}
552
553
fn get_connection_token(tunnel: &ActiveTunnel) -> String {
554
let mut hash = Sha256::new();
555
hash.update(tunnel.id.as_bytes());
556
let result = hash.finalize();
557
let mut result = b64::URL_SAFE_NO_PAD.encode(result);
558
if result.starts_with('-') {
559
result.insert(0, 'a'); // avoid arg parsing issue
560
}
561
562
result
563
}
564
565
async fn serve_with_csa(
566
paths: LauncherPaths,
567
mut log: log::Logger,
568
gateway_args: TunnelServeArgs,
569
mut csa: CodeServerArgs,
570
app_mutex_name: Option<&'static str>,
571
) -> Result<i32, AnyError> {
572
let log_broadcast = BroadcastLogSink::new();
573
log = log.tee(log_broadcast.clone());
574
log::install_global_logger(log.clone()); // re-install so that library logs are captured
575
576
debug!(
577
log,
578
"Starting tunnel with `{} {}`",
579
APPLICATION_NAME,
580
std::env::args().collect::<Vec<_>>().join(" ")
581
);
582
583
// Intentionally read before starting the server. If the server updated and
584
// respawn is requested, the old binary will get renamed, and then
585
// current_exe will point to the wrong path.
586
let current_exe = std::env::current_exe().unwrap();
587
588
let mut vec = vec![
589
ShutdownRequest::CtrlC,
590
ShutdownRequest::ExeUninstalled(current_exe.to_owned()),
591
];
592
if let Some(p) = gateway_args
593
.parent_process_id
594
.and_then(|p| Pid::from_str(&p).ok())
595
{
596
vec.push(ShutdownRequest::ParentProcessKilled(p));
597
}
598
let mut shutdown = ShutdownRequest::create_rx(vec);
599
600
let server = loop {
601
if shutdown.is_open() {
602
return Ok(0);
603
}
604
605
match acquire_singleton(&paths.tunnel_lockfile()).await {
606
Ok(SingletonConnection::Client(stream)) => {
607
debug!(log, "starting as client to singleton");
608
if gateway_args.name.is_some()
609
|| !gateway_args.server_args.install_extension.is_empty()
610
|| gateway_args.tunnel.tunnel_id.is_some()
611
{
612
warning!(
613
log,
614
"Command-line options will not be applied until the existing tunnel exits."
615
);
616
}
617
618
let should_exit = start_singleton_client(SingletonClientArgs {
619
log: log.clone(),
620
shutdown: shutdown.clone(),
621
stream,
622
})
623
.await;
624
if should_exit {
625
return Ok(0);
626
}
627
}
628
Ok(SingletonConnection::Singleton(server)) => break server,
629
Err(e) => {
630
warning!(log, "error access singleton, retrying: {}", e);
631
tokio::time::sleep(Duration::from_secs(2)).await
632
}
633
}
634
};
635
636
debug!(log, "starting as new singleton");
637
638
let mut server =
639
make_singleton_server(log_broadcast.clone(), log.clone(), server, shutdown.clone());
640
let platform = spanf!(log, log.span("prereq"), PreReqChecker::new().verify())?;
641
let _lock = app_mutex_name.map(AppMutex::new);
642
643
let auth = Auth::new(&paths, log.clone());
644
let mut dt = dev_tunnels::DevTunnels::new_remote_tunnel(&log, auth, &paths);
645
loop {
646
let tunnel = if let Some(t) =
647
fulfill_existing_tunnel_args(gateway_args.tunnel.clone(), &gateway_args.name)
648
{
649
dt.start_existing_tunnel(t).await
650
} else {
651
tokio::select! {
652
t = dt.start_new_launcher_tunnel(gateway_args.name.as_deref(), gateway_args.random_name, &[CONTROL_PORT]) => t,
653
_ = shutdown.wait() => return Ok(1),
654
}
655
}?;
656
657
csa.connection_token = Some(get_connection_token(&tunnel));
658
659
let mut r = start_singleton_server(SingletonServerArgs {
660
log: log.clone(),
661
tunnel,
662
paths: &paths,
663
code_server_args: &csa,
664
platform,
665
log_broadcast: &log_broadcast,
666
shutdown: shutdown.clone(),
667
server: &mut server,
668
})
669
.await?;
670
r.tunnel.close().await.ok();
671
672
match r.next {
673
Next::Respawn => {
674
warning!(log, "respawn requested, starting new server");
675
// reuse current args, but specify no-forward since tunnels will
676
// already be running in this process, and we cannot do a login
677
let args = std::env::args().skip(1).collect::<Vec<String>>();
678
let exit = new_std_command(current_exe)
679
.args(args)
680
.spawn()
681
.map_err(|e| wrap(e, "error respawning after update"))?
682
.wait()
683
.map_err(|e| wrap(e, "error waiting for child"))?;
684
685
return Ok(exit.code().unwrap_or(1));
686
}
687
Next::Exit => {
688
debug!(log, "Tunnel shut down");
689
return Ok(0);
690
}
691
Next::Restart => continue,
692
}
693
}
694
}
695
696