Path: blob/main/cmd/internal/flowmode/resources_collector.go
4094 views
package flowmode12import (3"os"4"time"56"github.com/go-kit/log"7"github.com/go-kit/log/level"8"github.com/prometheus/client_golang/prometheus"9"github.com/shirou/gopsutil/v3/net"10"github.com/shirou/gopsutil/v3/process"11)1213// resourcesCollector is a prometheus.Collector which exposes process-level14// statistics. It is similar to the process collector in15// github.com/prometheus/client_golang but includes support for more platforms.16type resourcesCollector struct {17log log.Logger1819processStartTime *prometheus.Desc20cpuTotal *prometheus.Desc21rssMemory *prometheus.Desc22virtMemory *prometheus.Desc23rxBytes *prometheus.Desc24txBytes *prometheus.Desc25}2627var _ prometheus.Collector = (*resourcesCollector)(nil)2829// newResourcesCollector creates a new resourcesCollector.30func newResourcesCollector(l log.Logger) *resourcesCollector {31rc := &resourcesCollector{32log: l,3334processStartTime: prometheus.NewDesc(35"agent_resources_process_start_time_seconds",36"Start time of the process since Unix epoch in seconds.",37nil, nil,38),3940cpuTotal: prometheus.NewDesc(41"agent_resources_process_cpu_seconds_total",42"Total user and system CPU time spent in seconds.",43nil, nil,44),4546rssMemory: prometheus.NewDesc(47"agent_resources_process_resident_memory_bytes",48"Current resident memory size in bytes.",49nil, nil,50),5152virtMemory: prometheus.NewDesc(53"agent_resources_process_virtual_memory_bytes",54"Current virtual memory size in bytes.",55nil, nil,56),5758rxBytes: prometheus.NewDesc(59"agent_resources_machine_rx_bytes_total",60"Total bytes, host-wide, received across all network interfaces.",61nil, nil,62),6364txBytes: prometheus.NewDesc(65"agent_resources_machine_tx_bytes_total",66"Total bytes, host-wide, sent across all given network interface.",67nil, nil,68),69}7071return rc72}7374func (rc *resourcesCollector) Describe(ch chan<- *prometheus.Desc) {75ch <- rc.processStartTime76ch <- rc.cpuTotal77ch <- rc.rssMemory78ch <- rc.virtMemory79ch <- rc.rxBytes80ch <- rc.txBytes81}8283func (rc *resourcesCollector) Collect(ch chan<- prometheus.Metric) {84proc, err := process.NewProcess(int32(os.Getpid()))85if err != nil {86level.Error(rc.log).Log("msg", "failed to get process", "err", err)87return88}8990if t, err := proc.CreateTime(); err != nil {91rc.reportError(rc.processStartTime, err)92} else {93dur := time.Duration(t) * time.Millisecond9495ch <- prometheus.MustNewConstMetric(96rc.processStartTime,97prometheus.GaugeValue,98dur.Seconds(),99)100}101102if ts, err := proc.Times(); err != nil {103rc.reportError(rc.cpuTotal, err)104} else {105ch <- prometheus.MustNewConstMetric(106rc.cpuTotal,107prometheus.CounterValue,108ts.User+ts.System,109)110}111112if mi, err := proc.MemoryInfo(); err != nil {113rc.reportError(rc.virtMemory, err)114rc.reportError(rc.rssMemory, err)115} else {116ch <- prometheus.MustNewConstMetric(117rc.virtMemory,118prometheus.GaugeValue,119float64(mi.VMS),120)121122ch <- prometheus.MustNewConstMetric(123rc.rssMemory,124prometheus.GaugeValue,125float64(mi.RSS),126)127}128129if counters, err := net.IOCounters(true); err != nil {130rc.reportError(rc.rxBytes, err)131rc.reportError(rc.txBytes, err)132} else {133var rxBytes, txByes uint64134135for _, counter := range counters {136rxBytes += counter.BytesRecv137txByes += counter.BytesSent138}139140ch <- prometheus.MustNewConstMetric(141rc.rxBytes,142prometheus.CounterValue,143float64(rxBytes),144)145146ch <- prometheus.MustNewConstMetric(147rc.txBytes,148prometheus.CounterValue,149float64(txByes),150)151}152}153154func (rc *resourcesCollector) reportError(d *prometheus.Desc, err error) {155level.Error(rc.log).Log("msg", "failed to collect resources metric", "name", d.String(), "err", err)156}157158159