package agentctl12import (3"math"4"time"56"github.com/prometheus/prometheus/model/timestamp"7"github.com/prometheus/prometheus/tsdb/chunks"8"github.com/prometheus/prometheus/tsdb/record"9"github.com/prometheus/prometheus/tsdb/wlog"10)1112// WALStats stores statistics on the whole WAL.13type WALStats struct {14// From holds the first timestamp for the oldest sample found within the WAL.15From time.Time1617// To holds the last timestamp for the newest sample found within the WAL.18To time.Time1920// CheckpointNumber is the segment number of the most recently created21// checkpoint.22CheckpointNumber int2324// FirstSegment is the segment number of the first (oldest) non-checkpoint25// segment file found within the WAL folder.26FirstSegment int2728// FirstSegment is the segment number of the last (newest) non-checkpoint29// segment file found within the WAL folder.30LastSegment int3132// InvalidRefs is the number of samples with a ref ID to which there is no33// series defined.34InvalidRefs int3536// HashCollisions is the total number of times there has been a hash37// collision. A hash collision is any instance in which a hash of labels38// is defined by two ref IDs.39//40// For the Grafana Agent, a hash collision has no negative side effects41// on data sent to the remote_write endpoint but may have a noticeable impact42// on memory while the collision exists.43HashCollisions int4445// Targets holds stats on specific scrape targets.46Targets []WALTargetStats47}4849// Series returns the number of series across all targets.50func (s WALStats) Series() int {51var series int52for _, t := range s.Targets {53series += t.Series54}55return series56}5758// Samples returns the number of Samples across all targets.59func (s WALStats) Samples() int {60var samples int61for _, t := range s.Targets {62samples += t.Samples63}64return samples65}6667// WALTargetStats aggregates statistics on scrape targets across the entirety68// of the WAL and its checkpoints.69type WALTargetStats struct {70// Job corresponds to the "job" label on the scraped target.71Job string7273// Instance corresponds to the "instance" label on the scraped target.74Instance string7576// Series is the total number of series for the scraped target. It is77// equivalent to the total cardinality.78Series int7980// Samples is the total number of samples for the scraped target.81Samples int82}8384// CalculateStats calculates the statistics of the WAL for the given directory.85// walDir must be a folder containing segment files and checkpoint directories.86func CalculateStats(walDir string) (WALStats, error) {87w, err := wlog.Open(nil, walDir)88if err != nil {89return WALStats{}, err90}91defer w.Close()9293return newWALStatsCalculator(w).Calculate()94}9596type walStatsCalculator struct {97w *wlog.WL9899fromTime int64100toTime int64101invalidRefs int102103stats []*WALTargetStats104105statsLookup map[chunks.HeadSeriesRef]*WALTargetStats106107// hash -> # ref IDs with that hash108hashInstances map[uint64]int109}110111func newWALStatsCalculator(w *wlog.WL) *walStatsCalculator {112return &walStatsCalculator{113w: w,114fromTime: math.MaxInt64,115statsLookup: make(map[chunks.HeadSeriesRef]*WALTargetStats),116hashInstances: make(map[uint64]int),117}118}119120func (c *walStatsCalculator) Calculate() (WALStats, error) {121var (122stats WALStats123err error124)125126_, checkpointIdx, err := wlog.LastCheckpoint(c.w.Dir())127if err != nil && err != record.ErrNotFound {128return stats, err129}130131firstSegment, lastSegment, err := wlog.Segments(c.w.Dir())132if err != nil {133return stats, err134}135136stats.FirstSegment = firstSegment137stats.LastSegment = lastSegment138stats.CheckpointNumber = checkpointIdx139140// Iterate over the WAL and collect stats. This must be done before the rest141// of the function as readWAL populates internal state used for calculating142// stats.143err = walIterate(c.w, c.readWAL)144if err != nil {145return stats, err146}147148// Fill in the rest of the stats149stats.From = timestamp.Time(c.fromTime)150stats.To = timestamp.Time(c.toTime)151stats.InvalidRefs = c.invalidRefs152153for _, hashCount := range c.hashInstances {154if hashCount > 1 {155stats.HashCollisions++156}157}158159for _, tgt := range c.stats {160stats.Targets = append(stats.Targets, *tgt)161}162163return stats, nil164}165166func (c *walStatsCalculator) readWAL(r *wlog.Reader) error {167var dec record.Decoder168169for r.Next() {170rec := r.Record()171172// We ignore other record types here; we only write records and samples173// but we don't want to return an error for an unexpected record type;174// doing so would prevent users from getting stats on a traditional175// Prometheus WAL, which would be nice to support.176switch dec.Type(rec) {177case record.Series:178series, err := dec.Series(rec, nil)179if err != nil {180return err181}182for _, s := range series {183var (184jobLabel = s.Labels.Get("job")185instanceLabel = s.Labels.Get("instance")186)187188// Find or create the WALTargetStats for this job/instance pair.189var stats *WALTargetStats190for _, wts := range c.stats {191if wts.Job == jobLabel && wts.Instance == instanceLabel {192stats = wts193break194}195}196if stats == nil {197stats = &WALTargetStats{Job: jobLabel, Instance: instanceLabel}198c.stats = append(c.stats, stats)199}200201// Every time we get a new series, we want to increment the series202// count for the specific job/instance pair, store the ref ID so203// samples can modify the stats, and then store the hash of our204// labels to detect collisions (or flapping series).205stats.Series++206c.statsLookup[s.Ref] = stats207c.hashInstances[s.Labels.Hash()]++208}209case record.Samples:210samples, err := dec.Samples(rec, nil)211if err != nil {212return err213}214for _, s := range samples {215if s.T < c.fromTime {216c.fromTime = s.T217}218if s.T > c.toTime {219c.toTime = s.T220}221222stats := c.statsLookup[s.Ref]223if stats == nil {224c.invalidRefs++225continue226}227stats.Samples++228}229}230}231232return r.Err()233}234235// BySeriesCount can sort a slice of target stats by the count of236// series. The slice is sorted in descending order.237type BySeriesCount []WALTargetStats238239func (s BySeriesCount) Len() int { return len(s) }240func (s BySeriesCount) Less(i, j int) bool { return s[i].Series > s[j].Series }241func (s BySeriesCount) Swap(i, j int) { s[i], s[j] = s[j], s[i] }242243244