Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ee/agent-smith/pkg/agent/agent.go
2501 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package agent
6
7
import (
8
"context"
9
"fmt"
10
"net/url"
11
"sort"
12
"strings"
13
"sync"
14
"time"
15
16
"github.com/prometheus/client_golang/prometheus"
17
"golang.org/x/xerrors"
18
"google.golang.org/grpc"
19
"google.golang.org/grpc/credentials"
20
"google.golang.org/grpc/credentials/insecure"
21
"k8s.io/client-go/kubernetes"
22
"k8s.io/client-go/rest"
23
"k8s.io/client-go/tools/clientcmd"
24
"k8s.io/utils/lru"
25
26
"github.com/gitpod-io/gitpod/agent-smith/pkg/classifier"
27
"github.com/gitpod-io/gitpod/agent-smith/pkg/common"
28
"github.com/gitpod-io/gitpod/agent-smith/pkg/config"
29
"github.com/gitpod-io/gitpod/agent-smith/pkg/detector"
30
common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"
31
"github.com/gitpod-io/gitpod/common-go/log"
32
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
33
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
34
)
35
36
const (
37
// notificationCacheSize is the history size of notifications we don't want to get notified about again
38
notificationCacheSize = 1000
39
)
40
41
// Smith can perform operations within a users workspace and judge a user
42
type Smith struct {
43
Config config.Config
44
GitpodAPI gitpod.APIInterface
45
EnforcementRules map[string]config.EnforcementRules
46
Kubernetes kubernetes.Interface
47
metrics *metrics
48
49
wsman wsmanapi.WorkspaceManagerClient
50
51
timeElapsedHandler func(t time.Time) time.Duration
52
notifiedInfringements *lru.Cache
53
54
detector detector.ProcessDetector
55
classifier classifier.ProcessClassifier
56
fileDetector detector.FileDetector
57
fileClassifier classifier.FileClassifier
58
}
59
60
// NewAgentSmith creates a new agent smith
61
func NewAgentSmith(cfg config.Config) (*Smith, error) {
62
// establish default CPU limit penalty
63
if cfg.Enforcement.CPULimitPenalty == "" {
64
cfg.Enforcement.CPULimitPenalty = "500m"
65
}
66
67
var api gitpod.APIInterface
68
if cfg.GitpodAPI.HostURL != "" {
69
u, err := url.Parse(cfg.GitpodAPI.HostURL)
70
if err != nil {
71
return nil, xerrors.Errorf("cannot parse Gitpod API host url: %w", err)
72
}
73
endpoint := fmt.Sprintf("wss://%s/api/v1", u.Hostname())
74
75
api, err = gitpod.ConnectToServer(endpoint, gitpod.ConnectToServerOpts{
76
Context: context.Background(),
77
Token: cfg.GitpodAPI.APIToken,
78
Log: log.Log,
79
})
80
if err != nil {
81
return nil, xerrors.Errorf("cannot connect to Gitpod API: %w", err)
82
}
83
}
84
85
var clientset kubernetes.Interface
86
if cfg.Kubernetes.Enabled {
87
if cfg.Kubernetes.Kubeconfig != "" {
88
res, err := clientcmd.BuildConfigFromFlags("", cfg.Kubernetes.Kubeconfig)
89
if err != nil {
90
return nil, xerrors.Errorf("cannot connect to kubernetes: %w", err)
91
}
92
clientset, err = kubernetes.NewForConfig(res)
93
if err != nil {
94
return nil, xerrors.Errorf("cannot connect to kubernetes: %w", err)
95
}
96
} else {
97
k8s, err := rest.InClusterConfig()
98
if err != nil {
99
return nil, xerrors.Errorf("cannot connect to kubernetes: %w", err)
100
}
101
clientset, err = kubernetes.NewForConfig(k8s)
102
if err != nil {
103
return nil, xerrors.Errorf("cannot connect to kubernetes: %w", err)
104
}
105
}
106
}
107
108
grpcOpts := common_grpc.DefaultClientOptions()
109
if cfg.WorkspaceManager.TLS.Authority != "" || cfg.WorkspaceManager.TLS.Certificate != "" && cfg.WorkspaceManager.TLS.PrivateKey != "" {
110
tlsConfig, err := common_grpc.ClientAuthTLSConfig(
111
cfg.WorkspaceManager.TLS.Authority, cfg.WorkspaceManager.TLS.Certificate, cfg.WorkspaceManager.TLS.PrivateKey,
112
common_grpc.WithSetRootCAs(true),
113
common_grpc.WithServerName("ws-manager"),
114
)
115
if err != nil {
116
log.WithField("config", cfg.WorkspaceManager.TLS).Error("Cannot load ws-manager certs - this is a configuration issue.")
117
return nil, xerrors.Errorf("cannot load ws-manager certs: %w", err)
118
}
119
120
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
121
} else {
122
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
123
}
124
conn, err := grpc.Dial(cfg.WorkspaceManager.Address, grpcOpts...)
125
if err != nil {
126
return nil, xerrors.Errorf("cannot dial ws-manager-mk2: %w", err)
127
}
128
wsman := wsmanapi.NewWorkspaceManagerClient(conn)
129
130
detec, err := detector.NewProcfsDetector()
131
if err != nil {
132
return nil, err
133
}
134
135
class, err := cfg.Blocklists.Classifier()
136
if err != nil {
137
return nil, err
138
}
139
140
// Initialize filesystem detection if enabled
141
var filesystemDetec detector.FileDetector
142
var filesystemClass classifier.FileClassifier
143
if cfg.FilesystemScanning != nil && cfg.FilesystemScanning.Enabled {
144
// Create filesystem detector config
145
fsConfig := detector.FileScanningConfig{
146
Enabled: cfg.FilesystemScanning.Enabled,
147
ScanInterval: cfg.FilesystemScanning.ScanInterval.Duration,
148
MaxFileSize: cfg.FilesystemScanning.MaxFileSize,
149
WorkingArea: cfg.FilesystemScanning.WorkingArea,
150
}
151
152
// Create independent filesystem classifier (no dependency on process classifier)
153
filesystemClass, err = cfg.Blocklists.FileClassifier()
154
if err != nil {
155
log.WithError(err).Error("failed to create filesystem classifier")
156
} else {
157
filesystemDetec, err = detector.NewfileDetector(fsConfig, filesystemClass)
158
if err != nil {
159
log.WithError(err).Error("failed to create filesystem detector")
160
} else {
161
log.Info("Filesystem detector created successfully with independent classifier")
162
}
163
}
164
}
165
166
m := newAgentMetrics()
167
res := &Smith{
168
EnforcementRules: map[string]config.EnforcementRules{
169
defaultRuleset: {
170
config.GradeKind(config.InfringementExec, common.SeverityBarely): config.PenaltyLimitCPU,
171
config.GradeKind(config.InfringementExec, common.SeverityAudit): config.PenaltyStopWorkspace,
172
config.GradeKind(config.InfringementExec, common.SeverityVery): config.PenaltyStopWorkspaceAndBlockUser,
173
},
174
},
175
Config: cfg,
176
GitpodAPI: api,
177
Kubernetes: clientset,
178
179
wsman: wsman,
180
181
detector: detec,
182
classifier: class,
183
fileDetector: filesystemDetec,
184
fileClassifier: filesystemClass,
185
186
notifiedInfringements: lru.New(notificationCacheSize),
187
metrics: m,
188
timeElapsedHandler: time.Since,
189
}
190
if cfg.Enforcement.Default != nil {
191
if err := cfg.Enforcement.Default.Validate(); err != nil {
192
return nil, err
193
}
194
res.EnforcementRules[defaultRuleset] = *cfg.Enforcement.Default
195
}
196
for repo, rules := range cfg.Enforcement.PerRepo {
197
if err := rules.Validate(); err != nil {
198
return nil, err
199
}
200
res.EnforcementRules[repo] = rules
201
}
202
203
return res, nil
204
}
205
206
// InfringingWorkspace reports a user's wrongdoing in a workspace
207
type InfringingWorkspace struct {
208
SupervisorPID int
209
Namespace string
210
Pod string
211
Owner string
212
InstanceID string
213
WorkspaceID string
214
Infringements []Infringement
215
GitRemoteURL []string
216
}
217
218
// VID is an ID unique to this set of infringements
219
func (ws InfringingWorkspace) VID() string {
220
vt := make([]string, len(ws.Infringements))
221
for i, v := range ws.Infringements {
222
vt[i] = string(v.Kind)
223
}
224
sort.Slice(vt, func(i, j int) bool { return vt[i] < vt[j] })
225
226
return fmt.Sprintf("%s/%s", ws.Pod, strings.Join(vt, ":"))
227
}
228
229
// DescibeInfringements returns a string representation of all infringements of this workspace
230
func (ws InfringingWorkspace) DescribeInfringements(charCount int) string {
231
res := make([]string, len(ws.Infringements))
232
for i, v := range ws.Infringements {
233
res[i] = fmt.Sprintf("%s: %s", v.Kind, v.Description)
234
}
235
236
infringements := strings.Join(res, "\n")
237
if len(infringements) > charCount {
238
infringements = infringements[:charCount]
239
}
240
241
return infringements
242
}
243
244
// Infringement reports a users particular wrongdoing
245
type Infringement struct {
246
Description string
247
Kind config.GradedInfringementKind
248
CommandLine []string
249
}
250
251
// defaultRuleset is the name ("remote origin URL") of the default enforcement rules
252
const defaultRuleset = ""
253
254
type classifiedProcess struct {
255
P detector.Process
256
C *classifier.Classification
257
Err error
258
}
259
260
type classifiedFile struct {
261
F detector.File
262
C *classifier.Classification
263
Err error
264
}
265
266
// Start gets a stream of Infringements from Run and executes a callback on them to apply a Penalty
267
func (agent *Smith) Start(ctx context.Context, callback func(InfringingWorkspace, []config.PenaltyKind)) {
268
ps, err := agent.detector.DiscoverProcesses(ctx)
269
if err != nil {
270
log.WithError(err).Fatal("cannot start process detector")
271
}
272
273
// Start filesystem detection if enabled
274
var fs <-chan detector.File
275
if agent.fileDetector != nil {
276
fs, err = agent.fileDetector.DiscoverFiles(ctx)
277
if err != nil {
278
log.WithError(err).Warn("cannot start filesystem detector")
279
}
280
}
281
282
var (
283
wg sync.WaitGroup
284
cli = make(chan detector.Process, 500)
285
clo = make(chan classifiedProcess, 50)
286
fli = make(chan detector.File, 100)
287
flo = make(chan classifiedFile, 25)
288
)
289
agent.metrics.RegisterClassificationQueues(cli, clo)
290
291
workspaces := make(map[int]*common.Workspace)
292
wsMutex := &sync.Mutex{}
293
294
defer wg.Wait()
295
for i := 0; i < 25; i++ {
296
wg.Add(1)
297
go func() {
298
defer wg.Done()
299
for i := range cli {
300
// Update the workspaces map if this process belongs to a new workspace
301
wsMutex.Lock()
302
if _, ok := workspaces[i.Workspace.PID]; !ok {
303
log.Debugf("adding workspace with pid %d and workspaceId %s to workspaces", i.Workspace.PID, i.Workspace.WorkspaceID)
304
workspaces[i.Workspace.PID] = i.Workspace
305
}
306
wsMutex.Unlock()
307
// perform classification of the process
308
class, err := agent.classifier.Matches(i.Path, i.CommandLine)
309
// optimisation: early out to not block on the CLO chan
310
if err == nil && class.Level == classifier.LevelNoMatch {
311
continue
312
}
313
clo <- classifiedProcess{P: i, C: class, Err: err}
314
}
315
}()
316
}
317
318
// Filesystem classification workers (fewer than process workers)
319
if agent.fileClassifier != nil {
320
for i := 0; i < 5; i++ {
321
wg.Add(1)
322
go func() {
323
defer wg.Done()
324
for file := range fli {
325
class, err := agent.fileClassifier.MatchesFile(file.Path)
326
if err == nil && class.Level == classifier.LevelNoMatch {
327
log.Infof("File classification: no match - %s", file.Path)
328
continue
329
}
330
log.Infof("File classification result: %s (level: %s, err: %v)", file.Path, class.Level, err)
331
flo <- classifiedFile{F: file, C: class, Err: err}
332
}
333
}()
334
}
335
}
336
337
defer log.Info("agent smith main loop ended")
338
339
// We want to fill the classifier in a Go routine seaparete from using the classification
340
// results, to ensure we're not deadlocking/block ourselves. If this were in the same loop,
341
// we could easily get into a situation where we'd need to scale the queues to match the proc index.
342
go func() {
343
for {
344
select {
345
case <-ctx.Done():
346
return
347
case proc, ok := <-ps:
348
if !ok {
349
return
350
}
351
select {
352
case cli <- proc:
353
default:
354
// we're overfilling the classifier worker
355
agent.metrics.classificationBackpressureInDrop.Inc()
356
}
357
case file, ok := <-fs:
358
if !ok {
359
continue
360
}
361
select {
362
case fli <- file:
363
default:
364
// filesystem queue full, skip this file
365
}
366
}
367
}
368
}()
369
370
for {
371
select {
372
case <-ctx.Done():
373
return
374
case class := <-clo:
375
proc, cl, err := class.P, class.C, class.Err
376
if err != nil {
377
log.WithError(err).WithFields(log.OWI(proc.Workspace.OwnerID, proc.Workspace.WorkspaceID, proc.Workspace.InstanceID)).WithField("path", proc.Path).Error("cannot classify process")
378
continue
379
}
380
if cl == nil || cl.Level == classifier.LevelNoMatch {
381
continue
382
}
383
384
_, _ = agent.Penalize(InfringingWorkspace{
385
SupervisorPID: proc.Workspace.PID,
386
Owner: proc.Workspace.OwnerID,
387
InstanceID: proc.Workspace.InstanceID,
388
GitRemoteURL: []string{proc.Workspace.GitURL},
389
Infringements: []Infringement{
390
{
391
Kind: config.GradeKind(config.InfringementExec, common.Severity(cl.Level)),
392
Description: fmt.Sprintf("%s: %s", cl.Classifier, cl.Message),
393
CommandLine: proc.CommandLine,
394
},
395
},
396
})
397
case fileClass := <-flo:
398
log.Infof("Received classified file from flo channel")
399
file, cl, err := fileClass.F, fileClass.C, fileClass.Err
400
if err != nil {
401
log.WithError(err).WithFields(log.OWI(file.Workspace.OwnerID, file.Workspace.WorkspaceID, file.Workspace.InstanceID)).WithField("path", file.Path).Error("cannot classify filesystem file")
402
continue
403
}
404
405
log.WithField("path", file.Path).WithField("severity", cl.Level).WithField("message", cl.Message).
406
WithFields(log.OWI(file.Workspace.OwnerID, file.Workspace.WorkspaceID, file.Workspace.InstanceID)).
407
Info("filesystem signature detected")
408
409
_, _ = agent.Penalize(InfringingWorkspace{
410
SupervisorPID: file.Workspace.PID,
411
Owner: file.Workspace.OwnerID,
412
InstanceID: file.Workspace.InstanceID,
413
WorkspaceID: file.Workspace.WorkspaceID,
414
GitRemoteURL: []string{file.Workspace.GitURL},
415
Infringements: []Infringement{
416
{
417
Kind: config.GradeKind(config.InfringementExec, common.Severity(cl.Level)), // Reuse exec for now
418
Description: fmt.Sprintf("filesystem signature: %s", cl.Message),
419
CommandLine: []string{file.Path}, // Use file path as "command"
420
},
421
},
422
})
423
}
424
}
425
}
426
427
// Penalize acts on infringements and e.g. stops pods
428
func (agent *Smith) Penalize(ws InfringingWorkspace) ([]config.PenaltyKind, error) {
429
var remoteURL string
430
if len(ws.GitRemoteURL) > 0 {
431
remoteURL = ws.GitRemoteURL[0]
432
}
433
434
owi := log.OWI(ws.Owner, ws.WorkspaceID, ws.InstanceID)
435
436
penalty := getPenalty(agent.EnforcementRules[defaultRuleset], agent.EnforcementRules[remoteURL], ws.Infringements)
437
for _, p := range penalty {
438
switch p {
439
case config.PenaltyStopWorkspace:
440
log.WithField("infringement", log.TrustedValueWrap{Value: ws.Infringements}).WithFields(owi).Info("stopping workspace")
441
agent.metrics.penaltyAttempts.WithLabelValues(string(p)).Inc()
442
err := agent.stopWorkspace(ws.SupervisorPID, ws.InstanceID)
443
if err != nil {
444
log.WithError(err).WithFields(owi).Debug("failed to stop workspace")
445
agent.metrics.penaltyFailures.WithLabelValues(string(p), err.Error()).Inc()
446
}
447
return penalty, err
448
case config.PenaltyStopWorkspaceAndBlockUser:
449
log.WithField("infringement", log.TrustedValueWrap{Value: ws.Infringements}).WithFields(owi).Info("stopping workspace and blocking user")
450
agent.metrics.penaltyAttempts.WithLabelValues(string(p)).Inc()
451
err := agent.stopWorkspaceAndBlockUser(ws.SupervisorPID, ws.Owner, ws.WorkspaceID, ws.InstanceID)
452
if err != nil {
453
log.WithError(err).WithFields(owi).Debug("failed to stop workspace and block user")
454
agent.metrics.penaltyFailures.WithLabelValues(string(p), err.Error()).Inc()
455
}
456
return penalty, err
457
case config.PenaltyLimitCPU:
458
log.WithField("infringement", log.TrustedValueWrap{Value: ws.Infringements}).WithFields(owi).Info("limiting CPU")
459
agent.metrics.penaltyAttempts.WithLabelValues(string(p)).Inc()
460
err := agent.limitCPUUse(ws.Pod)
461
if err != nil {
462
log.WithError(err).WithFields(owi).Debug("failed to limit CPU")
463
agent.metrics.penaltyFailures.WithLabelValues(string(p), err.Error()).Inc()
464
}
465
return penalty, err
466
}
467
}
468
469
return penalty, nil
470
}
471
472
func findEnforcementRules(rules map[string]config.EnforcementRules, remoteURL string) config.EnforcementRules {
473
res, ok := rules[remoteURL]
474
if ok {
475
return res
476
}
477
478
for k, v := range rules {
479
hp, hs := strings.HasPrefix(k, "*"), strings.HasSuffix(k, "*")
480
if hp && hs && strings.Contains(strings.ToLower(remoteURL), strings.Trim(k, "*")) {
481
return v
482
}
483
if hp && strings.HasSuffix(strings.ToLower(remoteURL), strings.Trim(k, "*")) {
484
return v
485
}
486
if hs && strings.HasPrefix(strings.ToLower(remoteURL), strings.Trim(k, "*")) {
487
return v
488
}
489
}
490
491
return nil
492
}
493
494
// getPenalty decides what kind of penalty should be applied for a set of infringements.
495
// The penalty list will never contain PenaltyNone, but may be empty
496
func getPenalty(defaultRules, perRepoRules config.EnforcementRules, vs []Infringement) []config.PenaltyKind {
497
res := make(map[config.PenaltyKind]struct{})
498
for _, v := range vs {
499
p, ok := perRepoRules[v.Kind]
500
if ok {
501
res[p] = struct{}{}
502
continue
503
}
504
p, ok = defaultRules[v.Kind]
505
if ok {
506
res[p] = struct{}{}
507
}
508
}
509
510
var ps []config.PenaltyKind
511
for k := range res {
512
if k == config.PenaltyNone {
513
continue
514
}
515
ps = append(ps, k)
516
}
517
return ps
518
}
519
520
func (agent *Smith) Describe(d chan<- *prometheus.Desc) {
521
agent.metrics.Describe(d)
522
agent.classifier.Describe(d)
523
agent.detector.Describe(d)
524
}
525
526
func (agent *Smith) Collect(m chan<- prometheus.Metric) {
527
agent.metrics.Collect(m)
528
agent.classifier.Collect(m)
529
agent.detector.Collect(m)
530
}
531
532