Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/tunnels/port_forwarder.rs
3314 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
use std::collections::HashSet;
7
8
use tokio::sync::{mpsc, oneshot};
9
10
use crate::{
11
constants::CONTROL_PORT,
12
util::errors::{AnyError, CannotForwardControlPort, ServerHasClosed},
13
};
14
15
use super::{
16
dev_tunnels::ActiveTunnel,
17
protocol::{PortPrivacy, PortProtocol},
18
};
19
20
pub enum PortForwardingRec {
21
Forward(u16, PortPrivacy, oneshot::Sender<Result<String, AnyError>>),
22
Unforward(u16, oneshot::Sender<Result<(), AnyError>>),
23
}
24
25
/// Provides a port forwarding service for connected clients. Clients can make
26
/// requests on it, which are (and *must be*) processed by calling the `.process()`
27
/// method on the forwarder.
28
pub struct PortForwardingProcessor {
29
tx: mpsc::Sender<PortForwardingRec>,
30
rx: mpsc::Receiver<PortForwardingRec>,
31
forwarded: HashSet<u16>,
32
}
33
34
impl PortForwardingProcessor {
35
pub fn new() -> Self {
36
let (tx, rx) = mpsc::channel(8);
37
Self {
38
tx,
39
rx,
40
forwarded: HashSet::new(),
41
}
42
}
43
44
/// Gets a handle that can be passed off to consumers of port forwarding.
45
pub fn handle(&self) -> PortForwarding {
46
PortForwarding {
47
tx: self.tx.clone(),
48
}
49
}
50
51
/// Receives port forwarding requests. Consumers MUST call `process()`
52
/// with the received requests.
53
pub async fn recv(&mut self) -> Option<PortForwardingRec> {
54
self.rx.recv().await
55
}
56
57
/// Processes the incoming forwarding request.
58
pub async fn process(&mut self, req: PortForwardingRec, tunnel: &mut ActiveTunnel) {
59
match req {
60
PortForwardingRec::Forward(port, privacy, tx) => {
61
tx.send(self.process_forward(port, privacy, tunnel).await)
62
.ok();
63
}
64
PortForwardingRec::Unforward(port, tx) => {
65
tx.send(self.process_unforward(port, tunnel).await).ok();
66
}
67
}
68
}
69
70
async fn process_unforward(
71
&mut self,
72
port: u16,
73
tunnel: &mut ActiveTunnel,
74
) -> Result<(), AnyError> {
75
if port == CONTROL_PORT {
76
return Err(CannotForwardControlPort().into());
77
}
78
79
tunnel.remove_port(port).await?;
80
self.forwarded.remove(&port);
81
Ok(())
82
}
83
84
async fn process_forward(
85
&mut self,
86
port: u16,
87
privacy: PortPrivacy,
88
tunnel: &mut ActiveTunnel,
89
) -> Result<String, AnyError> {
90
if port == CONTROL_PORT {
91
return Err(CannotForwardControlPort().into());
92
}
93
94
if !self.forwarded.contains(&port) {
95
tunnel
96
.add_port_tcp(port, privacy, PortProtocol::Auto)
97
.await?;
98
self.forwarded.insert(port);
99
}
100
101
tunnel.get_port_uri(port)
102
}
103
}
104
105
#[derive(Clone)]
106
pub struct PortForwarding {
107
tx: mpsc::Sender<PortForwardingRec>,
108
}
109
110
impl PortForwarding {
111
pub async fn forward(&self, port: u16, privacy: PortPrivacy) -> Result<String, AnyError> {
112
let (tx, rx) = oneshot::channel();
113
let req = PortForwardingRec::Forward(port, privacy, tx);
114
115
if self.tx.send(req).await.is_err() {
116
return Err(ServerHasClosed().into());
117
}
118
119
match rx.await {
120
Ok(r) => r,
121
Err(_) => Err(ServerHasClosed().into()),
122
}
123
}
124
125
pub async fn unforward(&self, port: u16) -> Result<(), AnyError> {
126
let (tx, rx) = oneshot::channel();
127
let req = PortForwardingRec::Unforward(port, tx);
128
129
if self.tx.send(req).await.is_err() {
130
return Err(ServerHasClosed().into());
131
}
132
133
match rx.await {
134
Ok(r) => r,
135
Err(_) => Err(ServerHasClosed().into()),
136
}
137
}
138
}
139
140