Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/tunnels/server_multiplexer.rs
3314 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
use std::sync::Arc;
7
8
use futures::future::join_all;
9
10
use crate::log;
11
12
use super::server_bridge::ServerBridge;
13
14
type Inner = Arc<std::sync::Mutex<Option<Vec<ServerBridgeRec>>>>;
15
16
struct ServerBridgeRec {
17
id: u16,
18
// bridge is removed when there's a write loop currently active
19
bridge: Option<ServerBridge>,
20
write_queue: Vec<Vec<u8>>,
21
}
22
23
/// The ServerMultiplexer manages multiple server bridges and allows writing
24
/// to them in a thread-safe way. It is copy, sync, and clone.
25
#[derive(Clone)]
26
pub struct ServerMultiplexer {
27
inner: Inner,
28
}
29
30
impl ServerMultiplexer {
31
pub fn new() -> Self {
32
Self {
33
inner: Arc::new(std::sync::Mutex::new(Some(Vec::new()))),
34
}
35
}
36
37
/// Adds a new bridge to the multiplexer.
38
pub fn register(&self, id: u16, bridge: ServerBridge) {
39
let bridge_rec = ServerBridgeRec {
40
id,
41
bridge: Some(bridge),
42
write_queue: vec![],
43
};
44
45
let mut lock = self.inner.lock().unwrap();
46
match &mut *lock {
47
Some(server_bridges) => (*server_bridges).push(bridge_rec),
48
None => *lock = Some(vec![bridge_rec]),
49
}
50
}
51
52
/// Removes a server bridge by ID.
53
pub fn remove(&self, id: u16) {
54
let mut lock = self.inner.lock().unwrap();
55
if let Some(bridges) = &mut *lock {
56
bridges.retain(|sb| sb.id != id);
57
}
58
}
59
60
/// Handle an incoming server message. This is synchronous and uses a 'write loop'
61
/// to ensure message order is preserved exactly, which is necessary for compression.
62
/// Returns false if there was no server with the given bridge_id.
63
pub fn write_message(&self, log: &log::Logger, bridge_id: u16, message: Vec<u8>) -> bool {
64
let mut lock = self.inner.lock().unwrap();
65
66
let bridges = match &mut *lock {
67
Some(sb) => sb,
68
None => return false,
69
};
70
71
let record = match bridges.iter_mut().find(|b| b.id == bridge_id) {
72
Some(sb) => sb,
73
None => return false,
74
};
75
76
record.write_queue.push(message);
77
if let Some(bridge) = record.bridge.take() {
78
let bridges_lock = self.inner.clone();
79
let log = log.clone();
80
tokio::spawn(write_loop(log, record.id, bridge, bridges_lock));
81
}
82
83
true
84
}
85
86
/// Disposes all running server bridges.
87
pub async fn dispose(&self) {
88
let bridges = {
89
let mut lock = self.inner.lock().unwrap();
90
lock.take()
91
};
92
93
let bridges = match bridges {
94
Some(b) => b,
95
None => return,
96
};
97
98
join_all(
99
bridges
100
.into_iter()
101
.filter_map(|b| b.bridge)
102
.map(|b| b.close()),
103
)
104
.await;
105
}
106
}
107
108
/// Write loop started by `handle_server_message`. It takes the ServerBridge, and
109
/// runs until there's no more items in the 'write queue'. At that point, if the
110
/// record still exists in the bridges_lock (i.e. we haven't shut down), it'll
111
/// return the ServerBridge so that the next handle_server_message call starts
112
/// the loop again. Otherwise, it'll close the bridge.
113
async fn write_loop(log: log::Logger, id: u16, mut bridge: ServerBridge, bridges_lock: Inner) {
114
let mut items_vec = vec![];
115
loop {
116
{
117
let mut lock = bridges_lock.lock().unwrap();
118
let server_bridges = match &mut *lock {
119
Some(sb) => sb,
120
None => break,
121
};
122
123
let bridge_rec = match server_bridges.iter_mut().find(|b| id == b.id) {
124
Some(b) => b,
125
None => break,
126
};
127
128
if bridge_rec.write_queue.is_empty() {
129
bridge_rec.bridge = Some(bridge);
130
return;
131
}
132
133
std::mem::swap(&mut bridge_rec.write_queue, &mut items_vec);
134
}
135
136
for item in items_vec.drain(..) {
137
if let Err(e) = bridge.write(item).await {
138
warning!(log, "Error writing to server: {:?}", e);
139
break;
140
}
141
}
142
}
143
144
bridge.close().await.ok(); // got here from `break` above, meaning our record got cleared. Close the bridge if so
145
}
146
147