Path: blob/main/components/ee/agent-smith/pkg/detector/proc.go
2501 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package detector56import (7"bufio"8"bytes"9"context"10"encoding/binary"11"fmt"12"io"13"os"14"path/filepath"15"sort"16"strconv"17"strings"18"sync"19"time"2021"github.com/cespare/xxhash/v2"22"github.com/gitpod-io/gitpod/agent-smith/pkg/common"23"github.com/gitpod-io/gitpod/common-go/log"24lru "github.com/hashicorp/golang-lru"25"github.com/prometheus/client_golang/prometheus"26"github.com/prometheus/procfs"27)2829type discoverableProcFS interface {30Discover() map[int]*process31Environ(pid int) ([]string, error)32}3334type realProcfs procfs.FS3536var _ discoverableProcFS = realProcfs{}3738func (fs realProcfs) Discover() map[int]*process {39proc := procfs.FS(fs)40procs, err := proc.AllProcs()41if err != nil {42log.WithError(err).Error("cannot list processes")43}44sort.Sort(procs)4546idx := make(map[int]*process, len(procs))4748digest := make([]byte, 24)49for _, p := range procs {50cmdline, err := p.CmdLine()51if err != nil {52log.WithField("pid", p.PID).WithError(err).Debug("cannot get commandline of process")53continue54}55stat, err := statProc(p.PID)56if err != nil {57log.WithField("pid", p.PID).WithError(err).Debug("cannot stat process")58continue59}60// Note: don't use p.Executable() here because it resolves the exe symlink which yields61// a path that doesn't make sense in this mount namespace. However, reading from this62// file directly works.63path := filepath.Join("proc", strconv.Itoa(p.PID), "exe")6465// Even though we loop through a sorted process list (lowest PID first), we cannot66// assume that we've seen the parent already due to PID reuse.67parent, ok := idx[stat.PPID]68if !ok {69parent = &process{PID: stat.PPID}70idx[parent.PID] = parent71}72proc, ok := idx[p.PID]73if !ok {74proc = &process{PID: p.PID, Leaf: true}75}76proc.Cmdline = cmdline77proc.Parent = parent78proc.Kind = ProcessUnknown79proc.Path = path80parent.Children = append(parent.Children, proc)8182binary.LittleEndian.PutUint64(digest[0:8], uint64(p.PID))83binary.LittleEndian.PutUint64(digest[8:16], uint64(stat.PPID))84binary.LittleEndian.PutUint64(digest[16:24], stat.Starttime)85proc.Hash = xxhash.Sum64(digest)8687idx[p.PID] = proc88}89return idx90}9192type stat struct {93PPID int94Starttime uint6495}9697// statProc returns a limited set of /proc/<pid>/stat content.98func statProc(pid int) (*stat, error) {99f, err := os.Open(fmt.Sprintf("/proc/%d/stat", pid))100if err != nil {101return nil, err102}103defer f.Close()104105return parseStat(f)106}107108func parseStat(r io.Reader) (res *stat, err error) {109var (110ppid uint64111foundPPID bool112starttime uint64113i = -1114)115116scan := bufio.NewScanner(r)117// We use a fixed buffer size assuming that none of the env vars we're interested in is any larger.118// This is part of the trick to keep allocs down.119scan.Buffer(make([]byte, 512), 512)120scan.Split(scanFixedSpace(512))121for scan.Scan() {122text := scan.Bytes()123if text[len(text)-1] == ')' {124i = 0125}126127if i == 2 {128ppid, err = strconv.ParseUint(string(text), 10, 64)129foundPPID = true130}131if i == 20 {132starttime, err = strconv.ParseUint(string(text), 10, 64)133}134if err != nil {135return136}137138if i >= 0 {139i++140}141}142if err != nil {143return nil, err144}145if err := scan.Err(); err != nil {146return nil, err147}148149if !foundPPID || starttime == 0 {150return nil, fmt.Errorf("cannot parse stat")151}152153return &stat{154PPID: int(ppid),155Starttime: starttime,156}, nil157}158159func (p realProcfs) Environ(pid int) ([]string, error) {160// Note: procfs.Environ is too expensive becuase it uses io.ReadAll which leaks161// memory over time.162163f, err := os.Open(fmt.Sprintf("/proc/%d/environ", pid))164if err != nil {165return nil, err166}167defer f.Close()168169return parseGitpodEnviron(f)170}171172func parseGitpodEnviron(r io.Reader) ([]string, error) {173// Note: this function is benchmarked in BenchmarkParseGitpodEnviron.174// At the time of this wriging it consumed 3+N allocs where N is the number of175// env vars starting with GITPOD_.176//177// When making changes to this function, ensure you're not causing more allocs178// which could have a too drastic resource usage effect in prod.179180scan := bufio.NewScanner(r)181// We use a fixed buffer size assuming that none of the env vars we're interested in is any larger.182// This is part of the trick to keep allocs down.183scan.Buffer(make([]byte, 512), 512)184scan.Split(scanNullTerminatedLines(512))185186// we expect at least 10 relevant env vars187res := make([]string, 0, 10)188for scan.Scan() {189// we only keep GITPOD_ variables for optimisation190text := scan.Bytes()191if !bytes.HasPrefix(text, []byte("GITPOD_")) {192continue193}194195res = append(res, string(text))196}197return res, nil198}199200func scanNullTerminatedLines(fixedBufferSize int) func(data []byte, atEOF bool) (advance int, token []byte, err error) {201return func(data []byte, atEOF bool) (advance int, token []byte, err error) {202if atEOF && len(data) == 0 {203return 0, nil, nil204}205if i := bytes.IndexByte(data, 0); i >= 0 {206// We have a full null-terminated line.207return i + 1, data[:i], nil208}209// If we're at EOF, we have a final, non-terminated line. Return it.210if atEOF {211return len(data), data, nil212}213if len(data) == 512 {214return len(data), data, nil215}216// Request more data.217return 0, nil, nil218}219}220221func scanFixedSpace(fixedBufferSize int) func(data []byte, atEOF bool) (advance int, token []byte, err error) {222// The returned function behaves like bufio.ScanLines except that it doesn't try to223// request lines longer than fixedBufferSize.224return func(data []byte, atEOF bool) (advance int, token []byte, err error) {225if atEOF && len(data) == 0 {226return 0, nil, nil227}228if i := bytes.IndexByte(data, ' '); i >= 0 {229// We have a full null-terminated line.230return i + 1, data[:i], nil231}232// If we're at EOF, we have a final, non-terminated line. Return it.233if atEOF {234return len(data), data, nil235}236if len(data) == 512 {237return len(data), data, nil238}239// Request more data.240return 0, nil, nil241}242}243244var _ ProcessDetector = &ProcfsDetector{}245246// ProcfsDetector detects processes and workspaces on this node by scanning procfs247type ProcfsDetector struct {248mu sync.RWMutex249ps chan Process250251indexSizeGuage prometheus.Gauge252cacheUseCounterVec *prometheus.CounterVec253workspaceGauge prometheus.Gauge254255startOnce sync.Once256257proc discoverableProcFS258cache *lru.Cache259}260261func NewProcfsDetector() (*ProcfsDetector, error) {262p, err := procfs.NewFS("/proc")263if err != nil {264return nil, err265}266267cache, err := lru.New(2000)268if err != nil {269return nil, err270}271272return &ProcfsDetector{273indexSizeGuage: prometheus.NewGauge(prometheus.GaugeOpts{274Namespace: "gitpod",275Subsystem: "agent_smith_procfs_detector",276Name: "index_size",277Help: "number of entries in the last procfs scan index",278}),279cacheUseCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{280Namespace: "gitpod",281Subsystem: "agent_smith_procfs_detector",282Name: "cache_use_total",283Help: "process cache statistics",284}, []string{"use"}),285workspaceGauge: prometheus.NewGauge(prometheus.GaugeOpts{286Namespace: "gitpod",287Subsystem: "agent_smith_procfs_detector",288Name: "workspace_count",289Help: "number of detected workspaces",290}),291proc: realProcfs(p),292cache: cache,293}, nil294}295296func (det *ProcfsDetector) Describe(d chan<- *prometheus.Desc) {297det.indexSizeGuage.Describe(d)298det.cacheUseCounterVec.Describe(d)299det.workspaceGauge.Describe(d)300}301302func (det *ProcfsDetector) Collect(m chan<- prometheus.Metric) {303det.indexSizeGuage.Collect(m)304det.cacheUseCounterVec.Collect(m)305det.workspaceGauge.Collect(m)306}307308func (det *ProcfsDetector) start() {309ps := make(chan Process, 100)310go func() {311t := time.NewTicker(30 * time.Second)312defer t.Stop()313314for {315det.run(ps)316<-t.C317}318}()319go func() {320for p := range ps {321det.ps <- p322}323}()324log.Info("procfs detector started")325}326327type process struct {328PID int329Depth int330Path string331Kind ProcessKind332Parent *process333Children []*process334Leaf bool335Cmdline []string336Workspace *common.Workspace337Hash uint64338}339340func (det *ProcfsDetector) run(processes chan<- Process) {341log.Debug("procfs detector run")342idx := det.proc.Discover()343344// We now have a complete view of the process table. Let's calculate the depths345root, ok := idx[1]346if !ok {347log.Error("cannot find pid 1")348return349}350det.indexSizeGuage.Set(float64(len(idx)))351352// let's find all workspaces, from the root down353findWorkspaces(det.proc, root, 0, nil)354355workspaces := 0356for _, p := range idx {357if p.Workspace == nil {358continue359}360361if p.Kind == ProcessSandbox {362workspaces = workspaces + 1363}364365if p.Kind != ProcessUserWorkload {366continue367}368369if _, ok := det.cache.Get(p.Hash); ok {370det.cacheUseCounterVec.WithLabelValues("hit").Inc()371continue372}373det.cacheUseCounterVec.WithLabelValues("miss").Inc()374det.cache.Add(p.Hash, struct{}{})375376proc := Process{377Path: p.Path,378CommandLine: p.Cmdline,379Kind: p.Kind,380Workspace: p.Workspace,381}382log.WithField("proc", proc).Debug("found process")383processes <- proc384}385386det.workspaceGauge.Set(float64(workspaces))387}388389func findWorkspaces(proc discoverableProcFS, p *process, d int, ws *common.Workspace) {390p.Depth = d391p.Workspace = ws392if ws == nil {393p.Kind = ProcessUnknown394395if len(p.Cmdline) >= 2 && p.Cmdline[0] == "/proc/self/exe" && p.Cmdline[1] == "ring1" {396// we've potentially found a workspacekit process, and expect it's one child to a be a supervisor process397if len(p.Children) > 0 {398c := p.Children[0]399400if isSupervisor(c.Cmdline) {401// we've found the corresponding supervisor process - hence the original process must be a workspace402p.Workspace = extractWorkspaceFromWorkspacekit(proc, p.PID)403404if p.Workspace != nil {405// we have actually found a workspace, but extractWorkspaceFromWorkspacekit sets the PID of the workspace406// to the PID we extracted that data from, i.e. workspacekit. We want the workspace PID to point to the407// supervisor process, so that when we kill that process we hit supervisor, not workspacekit.408p.Workspace.PID = c.PID409p.Kind = ProcessSandbox410c.Kind = ProcessSupervisor411}412}413}414}415} else if isSupervisor(p.Cmdline) {416p.Kind = ProcessSupervisor417} else {418p.Kind = ProcessUserWorkload419}420421for _, c := range p.Children {422findWorkspaces(proc, c, d+1, p.Workspace)423}424}425426func isSupervisor(cmdline []string) bool {427return len(cmdline) == 2 && cmdline[0] == "supervisor" && cmdline[1] == "init"428}429430func extractWorkspaceFromWorkspacekit(proc discoverableProcFS, pid int) *common.Workspace {431env, err := proc.Environ(pid)432if err != nil {433log.WithError(err).Debug("cannot get environment from process - might have missed a workspace")434return nil435}436var (437ownerID, workspaceID, instanceID string438gitURL string439)440for _, e := range env {441if strings.HasPrefix(e, "GITPOD_OWNER_ID=") {442ownerID = strings.TrimPrefix(e, "GITPOD_OWNER_ID=")443continue444}445if strings.HasPrefix(e, "GITPOD_WORKSPACE_ID=") {446workspaceID = strings.TrimPrefix(e, "GITPOD_WORKSPACE_ID=")447continue448}449if strings.HasPrefix(e, "GITPOD_INSTANCE_ID=") {450instanceID = strings.TrimPrefix(e, "GITPOD_INSTANCE_ID=")451continue452}453if strings.HasPrefix(e, "GITPOD_WORKSPACE_CONTEXT_URL=") {454gitURL = strings.TrimPrefix(e, "GITPOD_WORKSPACE_CONTEXT_URL=")455continue456}457}458return &common.Workspace{459OwnerID: ownerID,460WorkspaceID: workspaceID,461InstanceID: instanceID,462GitURL: gitURL,463PID: pid,464}465}466467// DiscoverProcesses starts process discovery. Must not be called more than once.468func (det *ProcfsDetector) DiscoverProcesses(ctx context.Context) (<-chan Process, error) {469det.mu.Lock()470defer det.mu.Unlock()471472if det.ps != nil {473return nil, fmt.Errorf("already discovering processes")474}475res := make(chan Process, 100)476det.ps = res477det.startOnce.Do(det.start)478479return res, nil480}481482483