Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-daemon/pkg/netlimit/netlimit.go
2499 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package netlimit
6
7
import (
8
"context"
9
"errors"
10
"fmt"
11
"os"
12
"os/exec"
13
14
"runtime"
15
"strconv"
16
"sync"
17
"time"
18
19
"github.com/gitpod-io/gitpod/common-go/kubernetes"
20
"github.com/gitpod-io/gitpod/common-go/log"
21
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
22
"github.com/gitpod-io/gitpod/ws-daemon/pkg/nsinsider"
23
"github.com/google/nftables"
24
"github.com/prometheus/client_golang/prometheus"
25
"github.com/vishvananda/netns"
26
)
27
28
type ConnLimiter struct {
29
mu sync.RWMutex
30
limited map[string]struct{}
31
droppedBytes *prometheus.GaugeVec
32
droppedPackets *prometheus.GaugeVec
33
config Config
34
}
35
36
func NewConnLimiter(config Config, prom prometheus.Registerer) *ConnLimiter {
37
s := &ConnLimiter{
38
droppedBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
39
Name: "netlimit_connections_dropped_bytes",
40
Help: "Number of bytes dropped due to connection limiting",
41
}, []string{"node", "workspace"}),
42
43
droppedPackets: prometheus.NewGaugeVec(prometheus.GaugeOpts{
44
Name: "netlimit_connections_dropped_packets",
45
Help: "Number of packets dropped due to connection limiting",
46
}, []string{"node", "workspace"}),
47
limited: map[string]struct{}{},
48
}
49
50
s.config = config
51
52
if config.Enabled {
53
prom.MustRegister(
54
s.droppedBytes,
55
s.droppedPackets,
56
)
57
}
58
59
return s
60
}
61
62
func (c *ConnLimiter) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspace) error {
63
c.mu.Lock()
64
defer c.mu.Unlock()
65
66
_, hasAnnotation := ws.Pod.Annotations[kubernetes.WorkspaceNetConnLimitAnnotation]
67
if !hasAnnotation {
68
return nil
69
}
70
71
return c.limitWorkspace(ctx, ws)
72
}
73
74
func (c *ConnLimiter) WorkspaceUpdated(ctx context.Context, ws *dispatch.Workspace) error {
75
c.mu.Lock()
76
defer c.mu.Unlock()
77
78
_, hasAnnotation := ws.Pod.Annotations[kubernetes.WorkspaceNetConnLimitAnnotation]
79
if !hasAnnotation {
80
return nil
81
}
82
83
if _, ok := c.limited[ws.InstanceID]; ok {
84
return nil
85
}
86
87
return c.limitWorkspace(ctx, ws)
88
}
89
90
func (n *ConnLimiter) GetConnectionDropCounter(pid uint64) (*nftables.CounterObj, error) {
91
runtime.LockOSThread()
92
defer runtime.UnlockOSThread()
93
94
netns, err := netns.GetFromPid(int(pid))
95
if err != nil {
96
return nil, fmt.Errorf("could not get handle for network namespace: %w", err)
97
}
98
99
nftconn, err := nftables.New(nftables.WithNetNSFd(int(netns)))
100
if err != nil {
101
return nil, fmt.Errorf("could not establish netlink connection for nft: %w", err)
102
}
103
104
gitpodTable := &nftables.Table{
105
Name: "gitpod",
106
Family: nftables.TableFamilyIPv4,
107
}
108
109
counterObject, err := nftconn.GetObject(&nftables.CounterObj{
110
Table: gitpodTable,
111
Name: "ws-connection-drop-stats",
112
})
113
114
if err != nil {
115
return nil, fmt.Errorf("could not get connection drop stats: %w", err)
116
}
117
118
dropCounter, ok := counterObject.(*nftables.CounterObj)
119
if !ok {
120
return nil, fmt.Errorf("could not cast counter object")
121
}
122
123
return dropCounter, nil
124
}
125
126
func (c *ConnLimiter) limitWorkspace(ctx context.Context, ws *dispatch.Workspace) error {
127
disp := dispatch.GetFromContext(ctx)
128
if disp == nil {
129
return fmt.Errorf("no dispatch available")
130
}
131
132
pid, err := disp.Runtime.ContainerPID(ctx, ws.ContainerID)
133
if err != nil {
134
if errors.Is(err, context.Canceled) {
135
return nil
136
}
137
return fmt.Errorf("could not get pid for container %s of workspace %s", ws.ContainerID, ws.WorkspaceID)
138
}
139
140
err = nsinsider.Nsinsider(ws.InstanceID, int(pid), func(cmd *exec.Cmd) {
141
cmd.Args = append(cmd.Args, "setup-connection-limit", "--limit", strconv.Itoa(int(c.config.ConnectionsPerMinute)),
142
"--bucketsize", strconv.Itoa(int(c.config.BucketSize)))
143
if c.config.Enforce {
144
cmd.Args = append(cmd.Args, "--enforce")
145
}
146
}, nsinsider.EnterMountNS(false), nsinsider.EnterNetNS(true))
147
if err != nil {
148
if errors.Is(context.Cause(ctx), context.Canceled) {
149
return nil
150
}
151
log.WithError(err).WithFields(ws.OWI()).Error("cannot enable connection limiting")
152
return err
153
}
154
c.limited[ws.InstanceID] = struct{}{}
155
156
dispatch.GetDispatchWaitGroup(ctx).Add(1)
157
go func(*dispatch.Workspace) {
158
defer dispatch.GetDispatchWaitGroup(ctx).Done()
159
160
ticker := time.NewTicker(30 * time.Second)
161
defer ticker.Stop()
162
163
for {
164
select {
165
case <-ticker.C:
166
counter, err := c.GetConnectionDropCounter(pid)
167
if err != nil {
168
log.WithFields(ws.OWI()).WithError(err).Warnf("could not get connection drop stats")
169
continue
170
}
171
172
nodeName := os.Getenv("NODENAME")
173
c.droppedBytes.WithLabelValues(nodeName, ws.Pod.Name).Set(float64(counter.Bytes))
174
c.droppedPackets.WithLabelValues(nodeName, ws.Pod.Name).Set(float64(counter.Packets))
175
176
case <-ctx.Done():
177
c.mu.Lock()
178
delete(c.limited, ws.InstanceID)
179
c.mu.Unlock()
180
return
181
}
182
}
183
}(ws)
184
185
return nil
186
}
187
188
func (c *ConnLimiter) Update(config Config) {
189
c.mu.Lock()
190
defer c.mu.Unlock()
191
192
c.config = config
193
log.WithField("config", config).Info("updating network connection limits")
194
}
195
196