Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/tunnels/singleton_server.rs
3314 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
use std::{
7
pin::Pin,
8
sync::{Arc, Mutex},
9
};
10
11
use super::{
12
code_server::CodeServerArgs,
13
control_server::ServerTermination,
14
dev_tunnels::{ActiveTunnel, StatusLock},
15
protocol,
16
shutdown_signal::{ShutdownRequest, ShutdownSignal},
17
};
18
use crate::{
19
async_pipe::socket_stream_split,
20
json_rpc::{new_json_rpc, start_json_rpc, JsonRpcSerializer},
21
log,
22
rpc::{RpcCaller, RpcDispatcher},
23
singleton::SingletonServer,
24
state::LauncherPaths,
25
tunnels::code_server::print_listening,
26
update_service::Platform,
27
util::{
28
errors::{AnyError, CodeError},
29
ring_buffer::RingBuffer,
30
sync::{Barrier, ConcatReceivable},
31
},
32
};
33
use futures::future::Either;
34
use tokio::{
35
pin,
36
sync::{broadcast, mpsc},
37
task::JoinHandle,
38
};
39
40
pub struct SingletonServerArgs<'a> {
41
pub server: &'a mut RpcServer,
42
pub log: log::Logger,
43
pub tunnel: ActiveTunnel,
44
pub paths: &'a LauncherPaths,
45
pub code_server_args: &'a CodeServerArgs,
46
pub platform: Platform,
47
pub shutdown: Barrier<ShutdownSignal>,
48
pub log_broadcast: &'a BroadcastLogSink,
49
}
50
51
struct StatusInfo {
52
name: String,
53
lock: StatusLock,
54
}
55
56
#[derive(Clone)]
57
struct SingletonServerContext {
58
log: log::Logger,
59
shutdown_tx: broadcast::Sender<ShutdownSignal>,
60
broadcast_tx: broadcast::Sender<Vec<u8>>,
61
// ugly: a lock in a lock. current_status needs to be provided only
62
// after we set up the tunnel, however the tunnel is created after the
63
// singleton server starts to avoid a gap in singleton availability.
64
// However, this should be safe, as the lock is only used for immediate
65
// data reads (in the `status` method).
66
current_status: Arc<Mutex<Option<StatusInfo>>>,
67
}
68
69
pub struct RpcServer {
70
fut: JoinHandle<Result<(), CodeError>>,
71
shutdown_broadcast: broadcast::Sender<ShutdownSignal>,
72
current_status: Arc<Mutex<Option<StatusInfo>>>,
73
}
74
75
pub fn make_singleton_server(
76
log_broadcast: BroadcastLogSink,
77
log: log::Logger,
78
server: SingletonServer,
79
shutdown_rx: Barrier<ShutdownSignal>,
80
) -> RpcServer {
81
let (shutdown_broadcast, _) = broadcast::channel(4);
82
let rpc = new_json_rpc();
83
84
let current_status = Arc::new(Mutex::default());
85
let mut rpc = rpc.methods(SingletonServerContext {
86
log: log.clone(),
87
shutdown_tx: shutdown_broadcast.clone(),
88
broadcast_tx: log_broadcast.get_brocaster(),
89
current_status: current_status.clone(),
90
});
91
92
rpc.register_sync(
93
protocol::singleton::METHOD_RESTART,
94
|_: protocol::EmptyObject, ctx| {
95
info!(ctx.log, "restarting tunnel after client request");
96
let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcRestartRequested);
97
Ok(())
98
},
99
);
100
101
rpc.register_sync(
102
protocol::singleton::METHOD_STATUS,
103
|_: protocol::EmptyObject, c| {
104
Ok(c.current_status
105
.lock()
106
.unwrap()
107
.as_ref()
108
.map(|s| protocol::singleton::StatusWithTunnelName {
109
name: Some(s.name.clone()),
110
status: s.lock.read(),
111
})
112
.unwrap_or_default())
113
},
114
);
115
116
rpc.register_sync(
117
protocol::singleton::METHOD_SHUTDOWN,
118
|_: protocol::EmptyObject, ctx| {
119
info!(
120
ctx.log,
121
"closing tunnel and all clients after a shutdown request"
122
);
123
let _ = ctx.broadcast_tx.send(RpcCaller::serialize_notify(
124
&JsonRpcSerializer {},
125
protocol::singleton::METHOD_SHUTDOWN,
126
protocol::EmptyObject {},
127
));
128
let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcShutdownRequested);
129
Ok(())
130
},
131
);
132
133
// we tokio spawn instead of keeping a future, since we want it to progress
134
// even outside of the start_singleton_server loop (i.e. while the tunnel restarts)
135
let fut = tokio::spawn(async move {
136
serve_singleton_rpc(log_broadcast, server, rpc.build(log), shutdown_rx).await
137
});
138
RpcServer {
139
shutdown_broadcast,
140
current_status,
141
fut,
142
}
143
}
144
145
pub async fn start_singleton_server(
146
args: SingletonServerArgs<'_>,
147
) -> Result<ServerTermination, AnyError> {
148
let shutdown_rx = ShutdownRequest::create_rx([
149
ShutdownRequest::Derived(Box::new(args.server.shutdown_broadcast.subscribe())),
150
ShutdownRequest::Derived(Box::new(args.shutdown.clone())),
151
]);
152
153
{
154
print_listening(&args.log, &args.tunnel.name);
155
let mut status = args.server.current_status.lock().unwrap();
156
*status = Some(StatusInfo {
157
name: args.tunnel.name.clone(),
158
lock: args.tunnel.status(),
159
})
160
}
161
162
let serve_fut = super::serve(
163
&args.log,
164
args.tunnel,
165
args.paths,
166
args.code_server_args,
167
args.platform,
168
shutdown_rx,
169
);
170
171
pin!(serve_fut);
172
173
match futures::future::select(Pin::new(&mut args.server.fut), &mut serve_fut).await {
174
Either::Left((rpc_result, fut)) => {
175
// the rpc server will only end as a result of a graceful shutdown, or
176
// with an error. Return the result of the eventual shutdown of the
177
// control server.
178
rpc_result.unwrap()?;
179
fut.await
180
}
181
Either::Right((ctrl_result, _)) => ctrl_result,
182
}
183
}
184
185
async fn serve_singleton_rpc<C: Clone + Send + Sync + 'static>(
186
log_broadcast: BroadcastLogSink,
187
mut server: SingletonServer,
188
dispatcher: RpcDispatcher<JsonRpcSerializer, C>,
189
shutdown_rx: Barrier<ShutdownSignal>,
190
) -> Result<(), CodeError> {
191
let mut own_shutdown = shutdown_rx.clone();
192
let shutdown_fut = own_shutdown.wait();
193
pin!(shutdown_fut);
194
195
loop {
196
let cnx = tokio::select! {
197
c = server.accept() => c?,
198
_ = &mut shutdown_fut => return Ok(()),
199
};
200
201
let (read, write) = socket_stream_split(cnx);
202
let dispatcher = dispatcher.clone();
203
let msg_rx = log_broadcast.replay_and_subscribe();
204
let shutdown_rx = shutdown_rx.clone();
205
tokio::spawn(async move {
206
let _ = start_json_rpc(dispatcher.clone(), read, write, msg_rx, shutdown_rx).await;
207
});
208
}
209
}
210
211
/// Log sink that can broadcast and replay log events. Used for transmitting
212
/// logs from the singleton to all clients. This should be created and injected
213
/// into other services, like the tunnel, before `start_singleton_server`
214
/// is called.
215
#[derive(Clone)]
216
pub struct BroadcastLogSink {
217
recent: Arc<Mutex<RingBuffer<Vec<u8>>>>,
218
tx: broadcast::Sender<Vec<u8>>,
219
}
220
221
impl Default for BroadcastLogSink {
222
fn default() -> Self {
223
Self::new()
224
}
225
}
226
227
impl BroadcastLogSink {
228
pub fn new() -> Self {
229
let (tx, _) = broadcast::channel(64);
230
Self {
231
tx,
232
recent: Arc::new(Mutex::new(RingBuffer::new(50))),
233
}
234
}
235
236
pub fn get_brocaster(&self) -> broadcast::Sender<Vec<u8>> {
237
self.tx.clone()
238
}
239
240
fn replay_and_subscribe(
241
&self,
242
) -> ConcatReceivable<Vec<u8>, mpsc::UnboundedReceiver<Vec<u8>>, broadcast::Receiver<Vec<u8>>>
243
{
244
let (log_replay_tx, log_replay_rx) = mpsc::unbounded_channel();
245
246
for log in self.recent.lock().unwrap().iter() {
247
let _ = log_replay_tx.send(log.clone());
248
}
249
250
let _ = log_replay_tx.send(RpcCaller::serialize_notify(
251
&JsonRpcSerializer {},
252
protocol::singleton::METHOD_LOG_REPLY_DONE,
253
protocol::EmptyObject {},
254
));
255
256
ConcatReceivable::new(log_replay_rx, self.tx.subscribe())
257
}
258
}
259
260
impl log::LogSink for BroadcastLogSink {
261
fn write_log(&self, level: log::Level, prefix: &str, message: &str) {
262
let s = JsonRpcSerializer {};
263
let serialized = RpcCaller::serialize_notify(
264
&s,
265
protocol::singleton::METHOD_LOG,
266
protocol::singleton::LogMessage {
267
level: Some(level),
268
prefix,
269
message,
270
},
271
);
272
273
let _ = self.tx.send(serialized.clone());
274
self.recent.lock().unwrap().push(serialized);
275
}
276
277
fn write_result(&self, message: &str) {
278
self.write_log(log::Level::Info, "", message);
279
}
280
}
281
282