Path: blob/main/components/ee/agent-smith/pkg/agent/agent.go
2501 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package agent56import (7"context"8"fmt"9"net/url"10"sort"11"strings"12"sync"13"time"1415"github.com/prometheus/client_golang/prometheus"16"golang.org/x/xerrors"17"google.golang.org/grpc"18"google.golang.org/grpc/credentials"19"google.golang.org/grpc/credentials/insecure"20"k8s.io/client-go/kubernetes"21"k8s.io/client-go/rest"22"k8s.io/client-go/tools/clientcmd"23"k8s.io/utils/lru"2425"github.com/gitpod-io/gitpod/agent-smith/pkg/classifier"26"github.com/gitpod-io/gitpod/agent-smith/pkg/common"27"github.com/gitpod-io/gitpod/agent-smith/pkg/config"28"github.com/gitpod-io/gitpod/agent-smith/pkg/detector"29common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"30"github.com/gitpod-io/gitpod/common-go/log"31gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"32wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"33)3435const (36// notificationCacheSize is the history size of notifications we don't want to get notified about again37notificationCacheSize = 100038)3940// Smith can perform operations within a users workspace and judge a user41type Smith struct {42Config config.Config43GitpodAPI gitpod.APIInterface44EnforcementRules map[string]config.EnforcementRules45Kubernetes kubernetes.Interface46metrics *metrics4748wsman wsmanapi.WorkspaceManagerClient4950timeElapsedHandler func(t time.Time) time.Duration51notifiedInfringements *lru.Cache5253detector detector.ProcessDetector54classifier classifier.ProcessClassifier55fileDetector detector.FileDetector56fileClassifier classifier.FileClassifier57}5859// NewAgentSmith creates a new agent smith60func NewAgentSmith(cfg config.Config) (*Smith, error) {61// establish default CPU limit penalty62if cfg.Enforcement.CPULimitPenalty == "" {63cfg.Enforcement.CPULimitPenalty = "500m"64}6566var api gitpod.APIInterface67if cfg.GitpodAPI.HostURL != "" {68u, err := url.Parse(cfg.GitpodAPI.HostURL)69if err != nil {70return nil, xerrors.Errorf("cannot parse Gitpod API host url: %w", err)71}72endpoint := fmt.Sprintf("wss://%s/api/v1", u.Hostname())7374api, err = gitpod.ConnectToServer(endpoint, gitpod.ConnectToServerOpts{75Context: context.Background(),76Token: cfg.GitpodAPI.APIToken,77Log: log.Log,78})79if err != nil {80return nil, xerrors.Errorf("cannot connect to Gitpod API: %w", err)81}82}8384var clientset kubernetes.Interface85if cfg.Kubernetes.Enabled {86if cfg.Kubernetes.Kubeconfig != "" {87res, err := clientcmd.BuildConfigFromFlags("", cfg.Kubernetes.Kubeconfig)88if err != nil {89return nil, xerrors.Errorf("cannot connect to kubernetes: %w", err)90}91clientset, err = kubernetes.NewForConfig(res)92if err != nil {93return nil, xerrors.Errorf("cannot connect to kubernetes: %w", err)94}95} else {96k8s, err := rest.InClusterConfig()97if err != nil {98return nil, xerrors.Errorf("cannot connect to kubernetes: %w", err)99}100clientset, err = kubernetes.NewForConfig(k8s)101if err != nil {102return nil, xerrors.Errorf("cannot connect to kubernetes: %w", err)103}104}105}106107grpcOpts := common_grpc.DefaultClientOptions()108if cfg.WorkspaceManager.TLS.Authority != "" || cfg.WorkspaceManager.TLS.Certificate != "" && cfg.WorkspaceManager.TLS.PrivateKey != "" {109tlsConfig, err := common_grpc.ClientAuthTLSConfig(110cfg.WorkspaceManager.TLS.Authority, cfg.WorkspaceManager.TLS.Certificate, cfg.WorkspaceManager.TLS.PrivateKey,111common_grpc.WithSetRootCAs(true),112common_grpc.WithServerName("ws-manager"),113)114if err != nil {115log.WithField("config", cfg.WorkspaceManager.TLS).Error("Cannot load ws-manager certs - this is a configuration issue.")116return nil, xerrors.Errorf("cannot load ws-manager certs: %w", err)117}118119grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))120} else {121grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))122}123conn, err := grpc.Dial(cfg.WorkspaceManager.Address, grpcOpts...)124if err != nil {125return nil, xerrors.Errorf("cannot dial ws-manager-mk2: %w", err)126}127wsman := wsmanapi.NewWorkspaceManagerClient(conn)128129detec, err := detector.NewProcfsDetector()130if err != nil {131return nil, err132}133134class, err := cfg.Blocklists.Classifier()135if err != nil {136return nil, err137}138139// Initialize filesystem detection if enabled140var filesystemDetec detector.FileDetector141var filesystemClass classifier.FileClassifier142if cfg.FilesystemScanning != nil && cfg.FilesystemScanning.Enabled {143// Create filesystem detector config144fsConfig := detector.FileScanningConfig{145Enabled: cfg.FilesystemScanning.Enabled,146ScanInterval: cfg.FilesystemScanning.ScanInterval.Duration,147MaxFileSize: cfg.FilesystemScanning.MaxFileSize,148WorkingArea: cfg.FilesystemScanning.WorkingArea,149}150151// Create independent filesystem classifier (no dependency on process classifier)152filesystemClass, err = cfg.Blocklists.FileClassifier()153if err != nil {154log.WithError(err).Error("failed to create filesystem classifier")155} else {156filesystemDetec, err = detector.NewfileDetector(fsConfig, filesystemClass)157if err != nil {158log.WithError(err).Error("failed to create filesystem detector")159} else {160log.Info("Filesystem detector created successfully with independent classifier")161}162}163}164165m := newAgentMetrics()166res := &Smith{167EnforcementRules: map[string]config.EnforcementRules{168defaultRuleset: {169config.GradeKind(config.InfringementExec, common.SeverityBarely): config.PenaltyLimitCPU,170config.GradeKind(config.InfringementExec, common.SeverityAudit): config.PenaltyStopWorkspace,171config.GradeKind(config.InfringementExec, common.SeverityVery): config.PenaltyStopWorkspaceAndBlockUser,172},173},174Config: cfg,175GitpodAPI: api,176Kubernetes: clientset,177178wsman: wsman,179180detector: detec,181classifier: class,182fileDetector: filesystemDetec,183fileClassifier: filesystemClass,184185notifiedInfringements: lru.New(notificationCacheSize),186metrics: m,187timeElapsedHandler: time.Since,188}189if cfg.Enforcement.Default != nil {190if err := cfg.Enforcement.Default.Validate(); err != nil {191return nil, err192}193res.EnforcementRules[defaultRuleset] = *cfg.Enforcement.Default194}195for repo, rules := range cfg.Enforcement.PerRepo {196if err := rules.Validate(); err != nil {197return nil, err198}199res.EnforcementRules[repo] = rules200}201202return res, nil203}204205// InfringingWorkspace reports a user's wrongdoing in a workspace206type InfringingWorkspace struct {207SupervisorPID int208Namespace string209Pod string210Owner string211InstanceID string212WorkspaceID string213Infringements []Infringement214GitRemoteURL []string215}216217// VID is an ID unique to this set of infringements218func (ws InfringingWorkspace) VID() string {219vt := make([]string, len(ws.Infringements))220for i, v := range ws.Infringements {221vt[i] = string(v.Kind)222}223sort.Slice(vt, func(i, j int) bool { return vt[i] < vt[j] })224225return fmt.Sprintf("%s/%s", ws.Pod, strings.Join(vt, ":"))226}227228// DescibeInfringements returns a string representation of all infringements of this workspace229func (ws InfringingWorkspace) DescribeInfringements(charCount int) string {230res := make([]string, len(ws.Infringements))231for i, v := range ws.Infringements {232res[i] = fmt.Sprintf("%s: %s", v.Kind, v.Description)233}234235infringements := strings.Join(res, "\n")236if len(infringements) > charCount {237infringements = infringements[:charCount]238}239240return infringements241}242243// Infringement reports a users particular wrongdoing244type Infringement struct {245Description string246Kind config.GradedInfringementKind247CommandLine []string248}249250// defaultRuleset is the name ("remote origin URL") of the default enforcement rules251const defaultRuleset = ""252253type classifiedProcess struct {254P detector.Process255C *classifier.Classification256Err error257}258259type classifiedFile struct {260F detector.File261C *classifier.Classification262Err error263}264265// Start gets a stream of Infringements from Run and executes a callback on them to apply a Penalty266func (agent *Smith) Start(ctx context.Context, callback func(InfringingWorkspace, []config.PenaltyKind)) {267ps, err := agent.detector.DiscoverProcesses(ctx)268if err != nil {269log.WithError(err).Fatal("cannot start process detector")270}271272// Start filesystem detection if enabled273var fs <-chan detector.File274if agent.fileDetector != nil {275fs, err = agent.fileDetector.DiscoverFiles(ctx)276if err != nil {277log.WithError(err).Warn("cannot start filesystem detector")278}279}280281var (282wg sync.WaitGroup283cli = make(chan detector.Process, 500)284clo = make(chan classifiedProcess, 50)285fli = make(chan detector.File, 100)286flo = make(chan classifiedFile, 25)287)288agent.metrics.RegisterClassificationQueues(cli, clo)289290workspaces := make(map[int]*common.Workspace)291wsMutex := &sync.Mutex{}292293defer wg.Wait()294for i := 0; i < 25; i++ {295wg.Add(1)296go func() {297defer wg.Done()298for i := range cli {299// Update the workspaces map if this process belongs to a new workspace300wsMutex.Lock()301if _, ok := workspaces[i.Workspace.PID]; !ok {302log.Debugf("adding workspace with pid %d and workspaceId %s to workspaces", i.Workspace.PID, i.Workspace.WorkspaceID)303workspaces[i.Workspace.PID] = i.Workspace304}305wsMutex.Unlock()306// perform classification of the process307class, err := agent.classifier.Matches(i.Path, i.CommandLine)308// optimisation: early out to not block on the CLO chan309if err == nil && class.Level == classifier.LevelNoMatch {310continue311}312clo <- classifiedProcess{P: i, C: class, Err: err}313}314}()315}316317// Filesystem classification workers (fewer than process workers)318if agent.fileClassifier != nil {319for i := 0; i < 5; i++ {320wg.Add(1)321go func() {322defer wg.Done()323for file := range fli {324class, err := agent.fileClassifier.MatchesFile(file.Path)325if err == nil && class.Level == classifier.LevelNoMatch {326log.Infof("File classification: no match - %s", file.Path)327continue328}329log.Infof("File classification result: %s (level: %s, err: %v)", file.Path, class.Level, err)330flo <- classifiedFile{F: file, C: class, Err: err}331}332}()333}334}335336defer log.Info("agent smith main loop ended")337338// We want to fill the classifier in a Go routine seaparete from using the classification339// results, to ensure we're not deadlocking/block ourselves. If this were in the same loop,340// we could easily get into a situation where we'd need to scale the queues to match the proc index.341go func() {342for {343select {344case <-ctx.Done():345return346case proc, ok := <-ps:347if !ok {348return349}350select {351case cli <- proc:352default:353// we're overfilling the classifier worker354agent.metrics.classificationBackpressureInDrop.Inc()355}356case file, ok := <-fs:357if !ok {358continue359}360select {361case fli <- file:362default:363// filesystem queue full, skip this file364}365}366}367}()368369for {370select {371case <-ctx.Done():372return373case class := <-clo:374proc, cl, err := class.P, class.C, class.Err375if err != nil {376log.WithError(err).WithFields(log.OWI(proc.Workspace.OwnerID, proc.Workspace.WorkspaceID, proc.Workspace.InstanceID)).WithField("path", proc.Path).Error("cannot classify process")377continue378}379if cl == nil || cl.Level == classifier.LevelNoMatch {380continue381}382383_, _ = agent.Penalize(InfringingWorkspace{384SupervisorPID: proc.Workspace.PID,385Owner: proc.Workspace.OwnerID,386InstanceID: proc.Workspace.InstanceID,387GitRemoteURL: []string{proc.Workspace.GitURL},388Infringements: []Infringement{389{390Kind: config.GradeKind(config.InfringementExec, common.Severity(cl.Level)),391Description: fmt.Sprintf("%s: %s", cl.Classifier, cl.Message),392CommandLine: proc.CommandLine,393},394},395})396case fileClass := <-flo:397log.Infof("Received classified file from flo channel")398file, cl, err := fileClass.F, fileClass.C, fileClass.Err399if err != nil {400log.WithError(err).WithFields(log.OWI(file.Workspace.OwnerID, file.Workspace.WorkspaceID, file.Workspace.InstanceID)).WithField("path", file.Path).Error("cannot classify filesystem file")401continue402}403404log.WithField("path", file.Path).WithField("severity", cl.Level).WithField("message", cl.Message).405WithFields(log.OWI(file.Workspace.OwnerID, file.Workspace.WorkspaceID, file.Workspace.InstanceID)).406Info("filesystem signature detected")407408_, _ = agent.Penalize(InfringingWorkspace{409SupervisorPID: file.Workspace.PID,410Owner: file.Workspace.OwnerID,411InstanceID: file.Workspace.InstanceID,412WorkspaceID: file.Workspace.WorkspaceID,413GitRemoteURL: []string{file.Workspace.GitURL},414Infringements: []Infringement{415{416Kind: config.GradeKind(config.InfringementExec, common.Severity(cl.Level)), // Reuse exec for now417Description: fmt.Sprintf("filesystem signature: %s", cl.Message),418CommandLine: []string{file.Path}, // Use file path as "command"419},420},421})422}423}424}425426// Penalize acts on infringements and e.g. stops pods427func (agent *Smith) Penalize(ws InfringingWorkspace) ([]config.PenaltyKind, error) {428var remoteURL string429if len(ws.GitRemoteURL) > 0 {430remoteURL = ws.GitRemoteURL[0]431}432433owi := log.OWI(ws.Owner, ws.WorkspaceID, ws.InstanceID)434435penalty := getPenalty(agent.EnforcementRules[defaultRuleset], agent.EnforcementRules[remoteURL], ws.Infringements)436for _, p := range penalty {437switch p {438case config.PenaltyStopWorkspace:439log.WithField("infringement", log.TrustedValueWrap{Value: ws.Infringements}).WithFields(owi).Info("stopping workspace")440agent.metrics.penaltyAttempts.WithLabelValues(string(p)).Inc()441err := agent.stopWorkspace(ws.SupervisorPID, ws.InstanceID)442if err != nil {443log.WithError(err).WithFields(owi).Debug("failed to stop workspace")444agent.metrics.penaltyFailures.WithLabelValues(string(p), err.Error()).Inc()445}446return penalty, err447case config.PenaltyStopWorkspaceAndBlockUser:448log.WithField("infringement", log.TrustedValueWrap{Value: ws.Infringements}).WithFields(owi).Info("stopping workspace and blocking user")449agent.metrics.penaltyAttempts.WithLabelValues(string(p)).Inc()450err := agent.stopWorkspaceAndBlockUser(ws.SupervisorPID, ws.Owner, ws.WorkspaceID, ws.InstanceID)451if err != nil {452log.WithError(err).WithFields(owi).Debug("failed to stop workspace and block user")453agent.metrics.penaltyFailures.WithLabelValues(string(p), err.Error()).Inc()454}455return penalty, err456case config.PenaltyLimitCPU:457log.WithField("infringement", log.TrustedValueWrap{Value: ws.Infringements}).WithFields(owi).Info("limiting CPU")458agent.metrics.penaltyAttempts.WithLabelValues(string(p)).Inc()459err := agent.limitCPUUse(ws.Pod)460if err != nil {461log.WithError(err).WithFields(owi).Debug("failed to limit CPU")462agent.metrics.penaltyFailures.WithLabelValues(string(p), err.Error()).Inc()463}464return penalty, err465}466}467468return penalty, nil469}470471func findEnforcementRules(rules map[string]config.EnforcementRules, remoteURL string) config.EnforcementRules {472res, ok := rules[remoteURL]473if ok {474return res475}476477for k, v := range rules {478hp, hs := strings.HasPrefix(k, "*"), strings.HasSuffix(k, "*")479if hp && hs && strings.Contains(strings.ToLower(remoteURL), strings.Trim(k, "*")) {480return v481}482if hp && strings.HasSuffix(strings.ToLower(remoteURL), strings.Trim(k, "*")) {483return v484}485if hs && strings.HasPrefix(strings.ToLower(remoteURL), strings.Trim(k, "*")) {486return v487}488}489490return nil491}492493// getPenalty decides what kind of penalty should be applied for a set of infringements.494// The penalty list will never contain PenaltyNone, but may be empty495func getPenalty(defaultRules, perRepoRules config.EnforcementRules, vs []Infringement) []config.PenaltyKind {496res := make(map[config.PenaltyKind]struct{})497for _, v := range vs {498p, ok := perRepoRules[v.Kind]499if ok {500res[p] = struct{}{}501continue502}503p, ok = defaultRules[v.Kind]504if ok {505res[p] = struct{}{}506}507}508509var ps []config.PenaltyKind510for k := range res {511if k == config.PenaltyNone {512continue513}514ps = append(ps, k)515}516return ps517}518519func (agent *Smith) Describe(d chan<- *prometheus.Desc) {520agent.metrics.Describe(d)521agent.classifier.Describe(d)522agent.detector.Describe(d)523}524525func (agent *Smith) Collect(m chan<- prometheus.Metric) {526agent.metrics.Collect(m)527agent.classifier.Collect(m)528agent.detector.Collect(m)529}530531532