Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/singleton.rs
3309 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
use serde::{Deserialize, Serialize};
7
use std::{
8
fs::{File, OpenOptions},
9
io::{Seek, SeekFrom, Write},
10
path::{Path, PathBuf},
11
time::Duration,
12
};
13
use sysinfo::{Pid, PidExt};
14
15
use crate::{
16
async_pipe::{
17
get_socket_name, get_socket_rw_stream, listen_socket_rw_stream, AsyncPipe,
18
AsyncPipeListener,
19
},
20
util::{
21
errors::CodeError,
22
file_lock::{FileLock, Lock, PREFIX_LOCKED_BYTES},
23
machine::wait_until_process_exits,
24
},
25
};
26
27
pub struct SingletonServer {
28
server: AsyncPipeListener,
29
_lock: FileLock,
30
}
31
32
impl SingletonServer {
33
pub async fn accept(&mut self) -> Result<AsyncPipe, CodeError> {
34
self.server.accept().await
35
}
36
}
37
38
pub enum SingletonConnection {
39
/// This instance got the singleton lock. It started listening on a socket
40
/// and has the read/write pair. If this gets dropped, the lock is released.
41
Singleton(SingletonServer),
42
/// Another instance is a singleton, and this client connected to it.
43
Client(AsyncPipe),
44
}
45
46
/// Contents of the lock file; the listening socket ID and process ID
47
/// doing the listening.
48
#[derive(Deserialize, Serialize)]
49
struct LockFileMatter {
50
socket_path: String,
51
pid: u32,
52
}
53
54
/// Tries to acquire the singleton homed at the given lock file, either starting
55
/// a new singleton if it doesn't exist, or connecting otherwise.
56
pub async fn acquire_singleton(lock_file: &Path) -> Result<SingletonConnection, CodeError> {
57
let file = OpenOptions::new()
58
.read(true)
59
.write(true)
60
.create(true)
61
.truncate(false)
62
.open(lock_file)
63
.map_err(CodeError::SingletonLockfileOpenFailed)?;
64
65
match FileLock::acquire(file) {
66
Ok(Lock::AlreadyLocked(mut file)) => connect_as_client_with_file(&mut file)
67
.await
68
.map(SingletonConnection::Client),
69
Ok(Lock::Acquired(lock)) => start_singleton_server(lock)
70
.await
71
.map(SingletonConnection::Singleton),
72
Err(e) => Err(e),
73
}
74
}
75
76
/// Tries to connect to the singleton homed at the given file as a client.
77
pub async fn connect_as_client(lock_file: &Path) -> Result<AsyncPipe, CodeError> {
78
let mut file = OpenOptions::new()
79
.read(true)
80
.open(lock_file)
81
.map_err(CodeError::SingletonLockfileOpenFailed)?;
82
83
connect_as_client_with_file(&mut file).await
84
}
85
86
async fn start_singleton_server(mut lock: FileLock) -> Result<SingletonServer, CodeError> {
87
let socket_path = get_socket_name();
88
89
let mut vec = Vec::with_capacity(128);
90
let _ = vec.write(&[0; PREFIX_LOCKED_BYTES]);
91
let _ = rmp_serde::encode::write(
92
&mut vec,
93
&LockFileMatter {
94
socket_path: socket_path.to_string_lossy().to_string(),
95
pid: std::process::id(),
96
},
97
);
98
99
lock.file_mut()
100
.write_all(&vec)
101
.map_err(CodeError::SingletonLockfileOpenFailed)?;
102
103
let server = listen_socket_rw_stream(&socket_path).await?;
104
Ok(SingletonServer {
105
server,
106
_lock: lock,
107
})
108
}
109
110
const MAX_CLIENT_ATTEMPTS: i32 = 10;
111
112
async fn connect_as_client_with_file(mut file: &mut File) -> Result<AsyncPipe, CodeError> {
113
// retry, since someone else could get a lock and we could read it before
114
// the JSON info was finished writing out
115
let mut attempt = 0;
116
loop {
117
let _ = file.seek(SeekFrom::Start(PREFIX_LOCKED_BYTES as u64));
118
let r = match rmp_serde::from_read::<_, LockFileMatter>(&mut file) {
119
Ok(prev) => {
120
let socket_path = PathBuf::from(prev.socket_path);
121
122
tokio::select! {
123
p = retry_get_socket_rw_stream(&socket_path, 5, Duration::from_millis(500)) => p,
124
_ = wait_until_process_exits(Pid::from_u32(prev.pid), 500) => return Err(CodeError::SingletonLockedProcessExited(prev.pid)),
125
}
126
}
127
Err(e) => Err(CodeError::SingletonLockfileReadFailed(e)),
128
};
129
130
if r.is_ok() || attempt == MAX_CLIENT_ATTEMPTS {
131
return r;
132
}
133
134
attempt += 1;
135
tokio::time::sleep(Duration::from_millis(500)).await;
136
}
137
}
138
139
async fn retry_get_socket_rw_stream(
140
path: &Path,
141
max_tries: usize,
142
interval: Duration,
143
) -> Result<AsyncPipe, CodeError> {
144
for i in 0.. {
145
match get_socket_rw_stream(path).await {
146
Ok(s) => return Ok(s),
147
Err(e) if i == max_tries => return Err(e),
148
Err(_) => tokio::time::sleep(interval).await,
149
}
150
}
151
152
unreachable!()
153
}
154
155
#[cfg(test)]
156
mod tests {
157
use super::*;
158
159
#[tokio::test]
160
async fn test_acquires_singleton() {
161
let dir = tempfile::tempdir().expect("expected to make temp dir");
162
let s = acquire_singleton(&dir.path().join("lock"))
163
.await
164
.expect("expected to acquire");
165
166
match s {
167
SingletonConnection::Singleton(_) => {}
168
_ => panic!("expected to be singleton"),
169
}
170
}
171
172
#[tokio::test]
173
async fn test_acquires_client() {
174
let dir = tempfile::tempdir().expect("expected to make temp dir");
175
let lockfile = dir.path().join("lock");
176
let s1 = acquire_singleton(&lockfile)
177
.await
178
.expect("expected to acquire1");
179
match s1 {
180
SingletonConnection::Singleton(mut l) => tokio::spawn(async move {
181
l.accept().await.expect("expected to accept");
182
}),
183
_ => panic!("expected to be singleton"),
184
};
185
186
let s2 = acquire_singleton(&lockfile)
187
.await
188
.expect("expected to acquire2");
189
match s2 {
190
SingletonConnection::Client(_) => {}
191
_ => panic!("expected to be client"),
192
}
193
}
194
}
195
196