Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-proxy/pkg/sshproxy/forward.go
2500 views
1
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package sshproxy
6
7
import (
8
"io"
9
"sync"
10
"time"
11
12
"github.com/gitpod-io/gitpod/common-go/analytics"
13
"github.com/gitpod-io/gitpod/common-go/log"
14
tracker "github.com/gitpod-io/gitpod/ws-proxy/pkg/analytics"
15
"github.com/gitpod-io/golang-crypto/ssh"
16
"golang.org/x/net/context"
17
)
18
19
func (s *Server) ChannelForward(ctx context.Context, session *Session, targetConn ssh.Conn, originChannel ssh.NewChannel) {
20
targetChan, targetReqs, err := targetConn.OpenChannel(originChannel.ChannelType(), originChannel.ExtraData())
21
if err != nil {
22
log.WithFields(log.OWI("", session.WorkspaceID, session.InstanceID)).WithError(err).Error("open target channel error")
23
_ = originChannel.Reject(ssh.ConnectionFailed, "open target channel error")
24
return
25
}
26
defer targetChan.Close()
27
28
originChan, originReqs, err := originChannel.Accept()
29
if err != nil {
30
log.WithFields(log.OWI("", session.WorkspaceID, session.InstanceID)).WithError(err).Error("accept origin channel failed")
31
return
32
}
33
if originChannel.ChannelType() == "session" {
34
originChan = startHeartbeatingChannel(originChan, s.Heartbeater, session)
35
}
36
defer originChan.Close()
37
38
maskedReqs := make(chan *ssh.Request, 1)
39
40
go func() {
41
for req := range originReqs {
42
switch req.Type {
43
case "pty-req", "shell":
44
log.WithFields(log.OWI("", session.WorkspaceID, session.InstanceID)).Debugf("forwarding %s request", req.Type)
45
if channel, ok := originChan.(*heartbeatingChannel); ok && req.Type == "pty-req" {
46
channel.mux.Lock()
47
channel.requestedPty = true
48
channel.mux.Unlock()
49
}
50
}
51
maskedReqs <- req
52
}
53
close(maskedReqs)
54
}()
55
56
originChannelWg := sync.WaitGroup{}
57
originChannelWg.Add(3)
58
targetChannelWg := sync.WaitGroup{}
59
targetChannelWg.Add(3)
60
61
wg := sync.WaitGroup{}
62
wg.Add(2)
63
64
go func() {
65
defer wg.Done()
66
_, _ = io.Copy(targetChan, originChan)
67
_ = targetChan.CloseWrite()
68
targetChannelWg.Done()
69
targetChannelWg.Wait()
70
_ = targetChan.Close()
71
}()
72
73
go func() {
74
defer wg.Done()
75
_, _ = io.Copy(originChan, targetChan)
76
_ = originChan.CloseWrite()
77
originChannelWg.Done()
78
originChannelWg.Wait()
79
_ = originChan.Close()
80
}()
81
82
go func() {
83
_, _ = io.Copy(targetChan.Stderr(), originChan.Stderr())
84
targetChannelWg.Done()
85
}()
86
87
go func() {
88
_, _ = io.Copy(originChan.Stderr(), targetChan.Stderr())
89
originChannelWg.Done()
90
}()
91
92
forward := func(sourceReqs <-chan *ssh.Request, targetChan ssh.Channel, channelWg *sync.WaitGroup) {
93
defer channelWg.Done()
94
for ctx.Err() == nil {
95
select {
96
case req, ok := <-sourceReqs:
97
if !ok {
98
return
99
}
100
b, err := targetChan.SendRequest(req.Type, req.WantReply, req.Payload)
101
_ = req.Reply(b, nil)
102
if err != nil {
103
return
104
}
105
case <-ctx.Done():
106
return
107
}
108
}
109
}
110
111
go forward(maskedReqs, targetChan, &targetChannelWg)
112
go forward(targetReqs, originChan, &originChannelWg)
113
114
wg.Wait()
115
log.WithFields(log.OWI("", session.WorkspaceID, session.InstanceID)).Debug("session forward stop")
116
}
117
118
func TrackIDECloseSignal(session *Session) {
119
propertics := make(map[string]interface{})
120
propertics["workspaceId"] = session.WorkspaceID
121
propertics["instanceId"] = session.InstanceID
122
propertics["clientKind"] = "ssh"
123
tracker.Track(analytics.TrackMessage{
124
Identity: analytics.Identity{UserID: session.OwnerUserId},
125
Event: "ide_close_signal",
126
Properties: propertics,
127
})
128
}
129
130
func startHeartbeatingChannel(c ssh.Channel, heartbeat Heartbeat, session *Session) ssh.Channel {
131
ctx, cancel := context.WithCancel(context.Background())
132
res := &heartbeatingChannel{
133
Channel: c,
134
t: time.NewTicker(30 * time.Second),
135
cancel: cancel,
136
}
137
go func() {
138
for {
139
select {
140
case <-res.t.C:
141
res.mux.Lock()
142
if !res.sawActivity || !res.requestedPty {
143
res.mux.Unlock()
144
continue
145
}
146
res.sawActivity = false
147
res.mux.Unlock()
148
heartbeat.SendHeartbeat(session.InstanceID, false, false)
149
case <-ctx.Done():
150
if res.requestedPty {
151
heartbeat.SendHeartbeat(session.InstanceID, true, false)
152
TrackIDECloseSignal(session)
153
log.WithField("instanceId", session.InstanceID).Info("send closed heartbeat")
154
}
155
return
156
}
157
}
158
}()
159
160
return res
161
}
162
163
type heartbeatingChannel struct {
164
ssh.Channel
165
166
mux sync.Mutex
167
sawActivity bool
168
t *time.Ticker
169
170
cancel context.CancelFunc
171
172
requestedPty bool
173
}
174
175
// Read reads up to len(data) bytes from the channel.
176
func (c *heartbeatingChannel) Read(data []byte) (written int, err error) {
177
written, err = c.Channel.Read(data)
178
if err == nil && written != 0 {
179
c.mux.Lock()
180
c.sawActivity = true
181
c.mux.Unlock()
182
}
183
return
184
}
185
186
func (c *heartbeatingChannel) Close() error {
187
c.t.Stop()
188
c.cancel()
189
return c.Channel.Close()
190
}
191
192