Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/tunnels/agent_host.rs
13383 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
use std::convert::Infallible;
7
use std::path::PathBuf;
8
use std::sync::Arc;
9
use std::time::{Duration, Instant};
10
11
use ::http::{Request, Response};
12
use http_body_util::BodyExt;
13
use hyper::body::Incoming;
14
use hyper_util::rt::TokioIo;
15
use tokio::io::{AsyncBufReadExt, BufReader};
16
use tokio::sync::Mutex;
17
18
use crate::async_pipe::{get_socket_name, get_socket_rw_stream, AsyncPipe};
19
use crate::constants::VSCODE_CLI_QUALITY;
20
use crate::download_cache::DownloadCache;
21
use crate::log;
22
use crate::options::Quality;
23
use crate::update_service::{
24
unzip_downloaded_release, Platform, Release, TargetKind, UpdateService,
25
};
26
use crate::util::command::new_script_command;
27
use crate::util::errors::CodeError;
28
use crate::util::http::{self, BoxedHttp};
29
use crate::util::http::{empty_body, full_body, HyperBody};
30
use crate::util::io::SilentCopyProgress;
31
use crate::util::sync::{new_barrier, Barrier, BarrierOpener};
32
33
use super::paths::{get_server_folder_name, SERVER_FOLDER_NAME};
34
35
/// How often to check for server updates.
36
pub const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(6 * 60 * 60);
37
/// How often to re-check whether the server has exited when an update is pending.
38
pub const UPDATE_POLL_INTERVAL: Duration = Duration::from_secs(10 * 60);
39
/// How long to wait for the server to signal readiness.
40
pub const STARTUP_TIMEOUT: Duration = Duration::from_secs(30);
41
42
/// Configuration for the agent host server process.
43
#[derive(Clone, Debug)]
44
pub struct AgentHostConfig {
45
pub server_data_dir: Option<String>,
46
pub without_connection_token: bool,
47
pub connection_token: Option<String>,
48
pub connection_token_file: Option<String>,
49
}
50
51
/// State of the running VS Code server process.
52
struct RunningServer {
53
child: tokio::process::Child,
54
commit: String,
55
}
56
57
/// Manages the VS Code server lifecycle: on-demand start, auto-restart
58
/// after idle shutdown, and background update checking.
59
pub struct AgentHostManager {
60
log: log::Logger,
61
config: AgentHostConfig,
62
platform: Platform,
63
cache: DownloadCache,
64
update_service: UpdateService,
65
/// The latest known release, with the time it was checked.
66
latest_release: Mutex<Option<(Instant, Release)>>,
67
/// The currently running server, if any.
68
running: Mutex<Option<RunningServer>>,
69
/// Barrier that opens when a server is ready (socket path available).
70
/// Reset each time a new server is started.
71
ready: Mutex<Option<Barrier<Result<PathBuf, String>>>>,
72
}
73
74
impl AgentHostManager {
75
pub fn new(
76
log: log::Logger,
77
platform: Platform,
78
cache: DownloadCache,
79
http: BoxedHttp,
80
config: AgentHostConfig,
81
) -> Arc<Self> {
82
Arc::new(Self {
83
update_service: UpdateService::new(log.clone(), http),
84
log,
85
config,
86
platform,
87
cache,
88
latest_release: Mutex::new(None),
89
running: Mutex::new(None),
90
ready: Mutex::new(None),
91
})
92
}
93
94
/// Returns the socket path to a running server, starting one if needed.
95
pub async fn ensure_server(self: &Arc<Self>) -> Result<PathBuf, CodeError> {
96
// Fast path: if we already have a barrier, wait on it
97
{
98
let ready = self.ready.lock().await;
99
if let Some(barrier) = &*ready {
100
if barrier.is_open() {
101
// Check if the process is still running
102
let running = self.running.lock().await;
103
if running.is_some() {
104
return barrier
105
.clone()
106
.wait()
107
.await
108
.unwrap()
109
.map_err(CodeError::ServerDownloadError);
110
}
111
} else {
112
// Still starting up, wait for it
113
let mut barrier = barrier.clone();
114
drop(ready);
115
return barrier
116
.wait()
117
.await
118
.unwrap()
119
.map_err(CodeError::ServerDownloadError);
120
}
121
}
122
}
123
124
// Need to start a new server
125
self.start_server().await
126
}
127
128
/// Starts the server with the latest already-downloaded version.
129
/// Only blocks on a network fetch if no version has been downloaded yet.
130
async fn start_server(self: &Arc<Self>) -> Result<PathBuf, CodeError> {
131
let (release, server_dir) = self.get_cached_or_download().await?;
132
133
let (mut barrier, opener) = new_barrier::<Result<PathBuf, String>>();
134
{
135
let mut ready = self.ready.lock().await;
136
*ready = Some(barrier.clone());
137
}
138
139
let self_clone = self.clone();
140
let release_clone = release.clone();
141
tokio::spawn(async move {
142
self_clone
143
.run_server(release_clone, server_dir, opener)
144
.await;
145
});
146
147
barrier
148
.wait()
149
.await
150
.unwrap()
151
.map_err(CodeError::ServerDownloadError)
152
}
153
154
/// Runs the server process to completion, handling readiness signaling.
155
async fn run_server(
156
self: &Arc<Self>,
157
release: Release,
158
server_dir: PathBuf,
159
opener: BarrierOpener<Result<PathBuf, String>>,
160
) {
161
let executable = if let Some(p) = option_env!("VSCODE_CLI_OVERRIDE_SERVER_PATH") {
162
PathBuf::from(p)
163
} else {
164
server_dir
165
.join(SERVER_FOLDER_NAME)
166
.join("bin")
167
.join(release.quality.server_entrypoint())
168
};
169
170
let agent_host_socket = get_socket_name();
171
let mut cmd = new_script_command(&executable);
172
cmd.stdin(std::process::Stdio::null());
173
cmd.stderr(std::process::Stdio::piped());
174
cmd.stdout(std::process::Stdio::piped());
175
cmd.arg("--socket-path");
176
cmd.arg(get_socket_name());
177
cmd.arg("--agent-host-path");
178
cmd.arg(&agent_host_socket);
179
cmd.args([
180
"--start-server",
181
"--accept-server-license-terms",
182
"--enable-remote-auto-shutdown",
183
]);
184
185
if let Some(a) = &self.config.server_data_dir {
186
cmd.arg("--server-data-dir");
187
cmd.arg(a);
188
}
189
if self.config.without_connection_token {
190
cmd.arg("--without-connection-token");
191
}
192
if let Some(ct) = &self.config.connection_token_file {
193
cmd.arg("--connection-token-file");
194
cmd.arg(ct);
195
}
196
cmd.env_remove("VSCODE_DEV");
197
198
let mut child = match cmd.spawn() {
199
Ok(c) => c,
200
Err(e) => {
201
opener.open(Err(e.to_string()));
202
return;
203
}
204
};
205
206
let commit_prefix = &release.commit[..release.commit.len().min(7)];
207
let (mut stdout, mut stderr) = (
208
BufReader::new(child.stdout.take().unwrap()).lines(),
209
BufReader::new(child.stderr.take().unwrap()).lines(),
210
);
211
212
// Wait for readiness with a timeout
213
let mut opener = Some(opener);
214
let socket_path = agent_host_socket.clone();
215
let startup_deadline = tokio::time::sleep(STARTUP_TIMEOUT);
216
tokio::pin!(startup_deadline);
217
218
let mut ready = false;
219
loop {
220
tokio::select! {
221
Ok(Some(l)) = stdout.next_line() => {
222
debug!(self.log, "[{} stdout]: {}", commit_prefix, l);
223
if !ready && l.contains("Agent host server listening on") {
224
ready = true;
225
if let Some(o) = opener.take() {
226
o.open(Ok(socket_path.clone()));
227
}
228
}
229
}
230
Ok(Some(l)) = stderr.next_line() => {
231
debug!(self.log, "[{} stderr]: {}", commit_prefix, l);
232
}
233
_ = &mut startup_deadline, if !ready => {
234
warning!(self.log, "[{}]: Server did not become ready within {}s", commit_prefix, STARTUP_TIMEOUT.as_secs());
235
// Don't fail — the server may still start up, just slowly
236
if let Some(o) = opener.take() {
237
o.open(Ok(socket_path.clone()));
238
}
239
ready = true;
240
}
241
e = child.wait() => {
242
info!(self.log, "[{} process]: exited: {:?}", commit_prefix, e);
243
if let Some(o) = opener.take() {
244
o.open(Err(format!("Server exited before ready: {e:?}")));
245
}
246
// Child has already exited; don't store it in `running`,
247
// otherwise the manager would be wedged with a dead child
248
// forever and ensure_server() would never restart.
249
return;
250
}
251
}
252
253
if ready {
254
break;
255
}
256
}
257
258
// Store the running server state
259
{
260
let mut running = self.running.lock().await;
261
*running = Some(RunningServer {
262
child,
263
commit: release.commit.clone(),
264
});
265
}
266
267
info!(self.log, "[{}]: Server ready", commit_prefix);
268
269
// Continue reading output until the process exits
270
let log = self.log.clone();
271
let commit_prefix = commit_prefix.to_string();
272
let self_clone = self.clone();
273
tokio::spawn(async move {
274
loop {
275
tokio::select! {
276
Ok(Some(l)) = stdout.next_line() => {
277
debug!(log, "[{} stdout]: {}", commit_prefix, l);
278
}
279
Ok(Some(l)) = stderr.next_line() => {
280
debug!(log, "[{} stderr]: {}", commit_prefix, l);
281
}
282
else => break,
283
}
284
}
285
286
// Server process has exited (auto-shutdown or crash)
287
info!(log, "[{}]: Server process ended", commit_prefix);
288
let mut running = self_clone.running.lock().await;
289
if let Some(r) = &*running {
290
if r.commit == commit_prefix || r.commit.starts_with(&commit_prefix) {
291
*running = None;
292
}
293
}
294
});
295
}
296
297
/// Returns a release and its local directory. Prefers the latest known
298
/// release if it has already been downloaded; otherwise falls back to any
299
/// cached version. Only fetches from the network and downloads if
300
/// nothing is cached at all.
301
async fn get_cached_or_download(&self) -> Result<(Release, PathBuf), CodeError> {
302
// When using a dev override, skip the update service entirely -
303
// the override path is used directly by run_server().
304
if option_env!("VSCODE_CLI_OVERRIDE_SERVER_PATH").is_some() {
305
let release = Release {
306
name: String::new(),
307
commit: String::from("dev"),
308
platform: self.platform,
309
target: TargetKind::Server,
310
quality: Quality::Insiders,
311
};
312
return Ok((release, PathBuf::new()));
313
}
314
315
// Best case: the latest known release is already downloaded
316
if let Some((_, release)) = &*self.latest_release.lock().await {
317
let name = get_server_folder_name(release.quality, &release.commit);
318
if let Some(dir) = self.cache.exists(&name) {
319
return Ok((release.clone(), dir));
320
}
321
}
322
323
let quality = VSCODE_CLI_QUALITY
324
.ok_or(CodeError::UpdatesNotConfigured("no configured quality"))
325
.and_then(|q| {
326
Quality::try_from(q).map_err(|_| CodeError::UpdatesNotConfigured("unknown quality"))
327
})?;
328
329
// Fall back to any cached version (still instant, just not the newest).
330
// Cache entries are named "<quality>-<commit>" via get_server_folder_name.
331
for entry in self.cache.get() {
332
if let Some(dir) = self.cache.exists(&entry) {
333
let (entry_quality, commit) = match entry.split_once('-') {
334
Some((q, c)) => match Quality::try_from(q.to_lowercase().as_str()) {
335
Ok(parsed) => (parsed, c.to_string()),
336
Err(_) => (quality, entry.clone()),
337
},
338
None => (quality, entry.clone()),
339
};
340
let release = Release {
341
name: String::new(),
342
commit,
343
platform: self.platform,
344
target: TargetKind::Server,
345
quality: entry_quality,
346
};
347
return Ok((release, dir));
348
}
349
}
350
351
// Nothing cached — must fetch and download (blocks the first connection)
352
info!(self.log, "No cached server version, downloading latest...");
353
let release = self.get_latest_release().await?;
354
let dir = self.ensure_downloaded(&release).await?;
355
Ok((release, dir))
356
}
357
358
/// Ensures the release is downloaded, returning the server directory.
359
pub async fn ensure_downloaded(&self, release: &Release) -> Result<PathBuf, CodeError> {
360
let cache_name = get_server_folder_name(release.quality, &release.commit);
361
if let Some(dir) = self.cache.exists(&cache_name) {
362
return Ok(dir);
363
}
364
365
info!(self.log, "Downloading server {}", release.commit);
366
let release = release.clone();
367
let log = self.log.clone();
368
let update_service = self.update_service.clone();
369
self.cache
370
.create(&cache_name, |target_dir| async move {
371
let tmpdir = tempfile::tempdir().unwrap();
372
let response = update_service.get_download_stream(&release).await?;
373
let name = response.url_path_basename().unwrap();
374
let archive_path = tmpdir.path().join(name);
375
http::download_into_file(
376
&archive_path,
377
log.get_download_logger("Downloading server:"),
378
response,
379
)
380
.await?;
381
let server_dir = target_dir.join(SERVER_FOLDER_NAME);
382
unzip_downloaded_release(&archive_path, &server_dir, SilentCopyProgress())?;
383
Ok(())
384
})
385
.await
386
.map_err(|e| CodeError::ServerDownloadError(e.to_string()))
387
}
388
389
/// Gets the latest release, caching the result.
390
pub async fn get_latest_release(&self) -> Result<Release, CodeError> {
391
let mut latest = self.latest_release.lock().await;
392
let now = Instant::now();
393
394
let quality = VSCODE_CLI_QUALITY
395
.ok_or(CodeError::UpdatesNotConfigured("no configured quality"))
396
.and_then(|q| {
397
Quality::try_from(q).map_err(|_| CodeError::UpdatesNotConfigured("unknown quality"))
398
})?;
399
400
let result = self
401
.update_service
402
.get_latest_commit(self.platform, TargetKind::Server, quality)
403
.await
404
.map_err(|e| CodeError::UpdateCheckFailed(e.to_string()));
405
406
// If the update service is unavailable, fall back to the cached version
407
if let (Err(e), Some((_, previous))) = (&result, latest.clone()) {
408
warning!(self.log, "Error checking for updates, using cached: {}", e);
409
*latest = Some((now, previous.clone()));
410
return Ok(previous);
411
}
412
413
let release = result?;
414
debug!(self.log, "Resolved server version: {}", release);
415
*latest = Some((now, release.clone()));
416
Ok(release)
417
}
418
419
/// Background loop: checks for updates periodically and pre-downloads
420
/// new versions when the server is idle.
421
pub async fn run_update_loop(self: Arc<Self>) {
422
let mut interval = tokio::time::interval(UPDATE_CHECK_INTERVAL);
423
interval.tick().await; // skip the immediate first tick
424
425
loop {
426
interval.tick().await;
427
428
let new_release = match self.get_latest_release().await {
429
Ok(r) => r,
430
Err(e) => {
431
warning!(self.log, "Update check failed: {}", e);
432
continue;
433
}
434
};
435
436
// Check if we already have this version
437
let name = get_server_folder_name(new_release.quality, &new_release.commit);
438
if self.cache.exists(&name).is_some() {
439
continue;
440
}
441
442
info!(self.log, "New server version available: {}", new_release);
443
444
// Wait until the server is not running before downloading
445
loop {
446
{
447
let running = self.running.lock().await;
448
if running.is_none() {
449
break;
450
}
451
}
452
debug!(self.log, "Server still running, waiting before updating...");
453
tokio::time::sleep(UPDATE_POLL_INTERVAL).await;
454
}
455
456
// Download the new version
457
match self.ensure_downloaded(&new_release).await {
458
Ok(_) => info!(self.log, "Updated server to {}", new_release),
459
Err(e) => warning!(self.log, "Failed to download update: {}", e),
460
}
461
}
462
}
463
464
/// Kills the currently running server, if any.
465
pub async fn kill_running_server(&self) {
466
let mut running = self.running.lock().await;
467
if let Some(mut server) = running.take() {
468
let _ = server.child.kill().await;
469
}
470
}
471
}
472
473
// ---- HTTP/WebSocket proxy ---------------------------------------------------
474
475
/// Proxies an incoming HTTP/WebSocket request to the agent host's Unix socket.
476
pub async fn handle_request(
477
manager: Arc<AgentHostManager>,
478
req: Request<Incoming>,
479
) -> Result<Response<HyperBody>, Infallible> {
480
let socket_path = match manager.ensure_server().await {
481
Ok(p) => p,
482
Err(e) => {
483
error!(manager.log, "Error starting agent host: {:?}", e);
484
return Ok(Response::builder()
485
.status(503)
486
.body(full_body(format!("Error starting agent host: {e:?}")))
487
.unwrap());
488
}
489
};
490
491
let is_upgrade = req.headers().contains_key(::http::header::UPGRADE);
492
493
let rw = match get_socket_rw_stream(&socket_path).await {
494
Ok(rw) => rw,
495
Err(e) => {
496
error!(
497
manager.log,
498
"Error connecting to agent host socket: {:?}", e
499
);
500
return Ok(Response::builder()
501
.status(503)
502
.body(full_body(format!("Error connecting to agent host: {e:?}")))
503
.unwrap());
504
}
505
};
506
507
if is_upgrade {
508
Ok(forward_ws_to_server(manager.log.clone(), rw, req).await)
509
} else {
510
Ok(forward_http_to_server(rw, req).await)
511
}
512
}
513
514
/// Proxies a standard HTTP request through the socket.
515
async fn forward_http_to_server(rw: AsyncPipe, req: Request<Incoming>) -> Response<HyperBody> {
516
let (mut request_sender, connection) =
517
match hyper::client::conn::http1::handshake(TokioIo::new(rw)).await {
518
Ok(r) => r,
519
Err(e) => return connection_err(e),
520
};
521
522
tokio::spawn(connection);
523
524
match request_sender.send_request(req).await {
525
Ok(res) => res.map(|b| b.boxed()),
526
Err(e) => connection_err(e),
527
}
528
}
529
530
/// Proxies a WebSocket upgrade request through the socket.
531
async fn forward_ws_to_server(
532
log: log::Logger,
533
rw: AsyncPipe,
534
mut req: Request<Incoming>,
535
) -> Response<HyperBody> {
536
let (mut request_sender, connection) =
537
match hyper::client::conn::http1::handshake(TokioIo::new(rw)).await {
538
Ok(r) => r,
539
Err(e) => return connection_err(e),
540
};
541
542
tokio::spawn(connection.with_upgrades());
543
544
let mut proxied_req = Request::builder().uri(req.uri());
545
for (k, v) in req.headers() {
546
proxied_req = proxied_req.header(k, v);
547
}
548
549
let mut res = match request_sender
550
.send_request(
551
proxied_req
552
.body(http_body_util::Empty::<bytes::Bytes>::new())
553
.unwrap(),
554
)
555
.await
556
{
557
Ok(r) => r,
558
Err(e) => return connection_err(e),
559
};
560
561
let mut proxied_res = Response::new(empty_body());
562
*proxied_res.status_mut() = res.status();
563
for (k, v) in res.headers() {
564
proxied_res.headers_mut().insert(k, v.clone());
565
}
566
567
if res.status() == ::http::StatusCode::SWITCHING_PROTOCOLS {
568
tokio::spawn(async move {
569
let (s_req, s_res) =
570
tokio::join!(hyper::upgrade::on(&mut req), hyper::upgrade::on(&mut res));
571
572
match (s_req, s_res) {
573
(Ok(s_req), Ok(s_res)) => {
574
let mut s_req = TokioIo::new(s_req);
575
let mut s_res = TokioIo::new(s_res);
576
if let Err(e) = tokio::io::copy_bidirectional(&mut s_req, &mut s_res).await {
577
debug!(log, "Agent host WebSocket proxy ended with error: {:?}", e);
578
}
579
}
580
(Err(e), _) => {
581
warning!(
582
log,
583
"Agent host client-side WebSocket upgrade failed: {:?}",
584
e
585
);
586
}
587
(_, Err(e)) => {
588
warning!(
589
log,
590
"Agent host server-side WebSocket upgrade failed: {:?}",
591
e
592
);
593
}
594
}
595
});
596
}
597
598
proxied_res
599
}
600
601
fn connection_err(err: hyper::Error) -> Response<HyperBody> {
602
Response::builder()
603
.status(503)
604
.body(full_body(format!(
605
"Error connecting to agent host: {err:?}"
606
)))
607
.unwrap()
608
}
609
610