Path: blob/main/pkg/metrics/instance/group_manager.go
4094 views
package instance12import (3"crypto/md5"4"encoding/hex"5"fmt"6"sort"7"sync"89"github.com/prometheus/prometheus/config"10)1112// A GroupManager wraps around another Manager and groups all incoming Configs13// into a smaller set of configs, causing less managed instances to be spawned.14//15// Configs are grouped by all settings for a Config *except* scrape configs.16// Any difference found in any flag will cause a Config to be placed in another17// group. One exception to this rule is that remote_writes are compared18// unordered, but the sets of remote_writes should otherwise be identical.19//20// GroupManagers drastically improve the performance of the Agent when a21// significant number of instances are spawned, as the overhead of each22// instance having its own service discovery, WAL, and remote_write can be23// significant.24//25// The config names of instances within the group will be represented by26// that group's hash of settings.27type GroupManager struct {28inner Manager2930mtx sync.Mutex3132// groups is a map of group name to the grouped configs.33groups map[string]groupedConfigs3435// groupLookup is a map of config name to group name.36groupLookup map[string]string37}3839// groupedConfigs holds a set of grouped configs, keyed by the config name.40// They are stored in a map rather than a slice to make overriding an existing41// config within the group less error-prone.42type groupedConfigs map[string]Config4344// Copy returns a shallow copy of the groupedConfigs.45func (g groupedConfigs) Copy() groupedConfigs {46res := make(groupedConfigs, len(g))47for k, v := range g {48res[k] = v49}50return res51}5253// NewGroupManager creates a new GroupManager for combining instances of the54// same "group."55func NewGroupManager(inner Manager) *GroupManager {56return &GroupManager{57inner: inner,58groups: make(map[string]groupedConfigs),59groupLookup: make(map[string]string),60}61}6263// GetInstance gets the underlying grouped instance for a given name.64func (m *GroupManager) GetInstance(name string) (ManagedInstance, error) {65m.mtx.Lock()66defer m.mtx.Unlock()6768group, ok := m.groupLookup[name]69if !ok {70return nil, fmt.Errorf("instance %s does not exist", name)71}7273inst, err := m.inner.GetInstance(group)74if err != nil {75return nil, fmt.Errorf("failed to get instance for %s: %w", name, err)76}77return inst, nil78}7980// ListInstances returns all currently grouped managed instances. The key81// will be the group's hash of shared settings.82func (m *GroupManager) ListInstances() map[string]ManagedInstance {83return m.inner.ListInstances()84}8586// ListConfigs returns the UNGROUPED instance configs with their original87// settings. To see the grouped instances, call ListInstances instead.88func (m *GroupManager) ListConfigs() map[string]Config {89m.mtx.Lock()90defer m.mtx.Unlock()9192cfgs := make(map[string]Config)93for _, groupedConfigs := range m.groups {94for _, cfg := range groupedConfigs {95cfgs[cfg.Name] = cfg96}97}98return cfgs99}100101// ApplyConfig will determine the group of the Config before applying it to102// the group. If no group exists, one will be created. If a group already103// exists, the group will have its settings merged with the Config and104// will be updated.105func (m *GroupManager) ApplyConfig(c Config) error {106m.mtx.Lock()107defer m.mtx.Unlock()108return m.applyConfig(c)109}110111func (m *GroupManager) applyConfig(c Config) (err error) {112groupName, err := hashConfig(c)113if err != nil {114return fmt.Errorf("failed to get group name for config %s: %w", c.Name, err)115}116117grouped := m.groups[groupName]118if grouped == nil {119grouped = make(groupedConfigs)120} else {121grouped = grouped.Copy()122}123124// Add the config to the group. If the config already exists within this125// group, it'll be overwritten.126grouped[c.Name] = c127mergedConfig, err := groupConfigs(groupName, grouped)128if err != nil {129err = fmt.Errorf("failed to group configs for %s: %w", c.Name, err)130return131}132133// If this config already exists in another group, we have to delete it.134// If we can't delete it from the old group, we also can't apply it.135if oldGroup, ok := m.groupLookup[c.Name]; ok && oldGroup != groupName {136// There's a few cases here where if something fails, it's safer to crash137// out and restart the Agent from scratch than it would be to continue as138// normal. The panics here are for truly exceptional cases, otherwise if139// something is recoverable, we'll return an error like normal.140141// If we can't find the old config, something got messed up when applying142// the config. But it also means that we're not going to be able to restore143// the config if something fails. Preemptively we should panic, since the144// internal state has gotten messed up and can't be fixed.145oldConfig, ok := m.groups[oldGroup][c.Name]146if !ok {147panic("failed to properly move config to new group. THIS IS A BUG!")148}149150err = m.deleteConfig(c.Name)151if err != nil {152err = fmt.Errorf("cannot apply config %s because deleting it from the old group failed: %w", c.Name, err)153return154}155156// Now that the config is deleted, we need to restore it in case applying157// the new one happens to fail.158defer func() {159if err == nil {160return161}162163// If restoring a config fails, we've left the Agent in a really bad164// state: the new config can't be applied and the old config can't be165// brought back. Just crash and let the Agent start fresh.166//167// Restoring the config _shouldn't_ fail here since applies only fail168// if the config is invalid. Since the config was running before, it169// should already be valid. If it does happen to fail, though, the170// internal state is left corrupted since we've completely lost a171// config.172restoreError := m.applyConfig(oldConfig)173if restoreError != nil {174panic(fmt.Sprintf("failed to properly restore config. THIS IS A BUG! error: %s", restoreError))175}176}()177}178179err = m.inner.ApplyConfig(mergedConfig)180if err != nil {181err = fmt.Errorf("failed to apply grouped configs for config %s: %w", c.Name, err)182return183}184185// If the inner apply succeeded, we can update our group and the lookup.186m.groups[groupName] = grouped187m.groupLookup[c.Name] = groupName188return189}190191// DeleteConfig will remove a Config from its associated group. If there are192// no more Configs within that group after this Config is deleted, the managed193// instance will be stopped. Otherwise, the managed instance will be updated194// with the new grouped Config that doesn't include the removed one.195func (m *GroupManager) DeleteConfig(name string) error {196m.mtx.Lock()197defer m.mtx.Unlock()198return m.deleteConfig(name)199}200201func (m *GroupManager) deleteConfig(name string) error {202groupName, ok := m.groupLookup[name]203if !ok {204return fmt.Errorf("config does not exist")205}206207// Grab a copy of the stored group and delete our entry. We can208// persist it after we successfully remove the config.209group := m.groups[groupName].Copy()210delete(group, name)211212if len(group) == 0 {213// We deleted the last remaining config in that group; we can delete it in214// its entirety now.215if err := m.inner.DeleteConfig(groupName); err != nil {216return fmt.Errorf("failed to delete empty group %s after removing config %s: %w", groupName, name, err)217}218} else {219// We deleted the config but there's still more in the group; apply the new220// group that holds the remainder of the configs (minus the one we just221// deleted).222mergedConfig, err := groupConfigs(groupName, group)223if err != nil {224return fmt.Errorf("failed to regroup configs without %s: %w", name, err)225}226227err = m.inner.ApplyConfig(mergedConfig)228if err != nil {229return fmt.Errorf("failed to apply new group without %s: %w", name, err)230}231}232233// Update the stored group and remove the entry from the lookup table.234if len(group) == 0 {235delete(m.groups, groupName)236} else {237m.groups[groupName] = group238}239240delete(m.groupLookup, name)241return nil242}243244// Stop stops the Manager and all of its managed instances.245func (m *GroupManager) Stop() {246m.mtx.Lock()247defer m.mtx.Unlock()248249m.inner.Stop()250m.groupLookup = make(map[string]string)251m.groups = make(map[string]groupedConfigs)252}253254// hashConfig determines the hash of a Config used for grouping. It ignores255// the name and scrape_configs and also orders remote_writes by name prior to256// hashing.257func hashConfig(c Config) (string, error) {258// We need a deep copy since we're going to mutate the remote_write259// pointers.260groupable, err := c.Clone()261if err != nil {262return "", err263}264265// Ignore name and scrape configs when hashing266groupable.Name = ""267groupable.ScrapeConfigs = nil268269// Assign names to remote_write configs if they're not present already.270// This is also done in AssignDefaults but is duplicated here for the sake271// of simplifying responsibility of GroupManager.272for _, cfg := range groupable.RemoteWrite {273if cfg != nil {274// We don't care if the names are different, just that the other settings275// are the same. Blank out the name here before hashing the remote276// write config.277cfg.Name = ""278279hash, err := getHash(cfg)280if err != nil {281return "", err282}283cfg.Name = hash[:6]284}285}286287// Now sort remote_writes by name and nil-ness.288sort.Slice(groupable.RemoteWrite, func(i, j int) bool {289switch {290case groupable.RemoteWrite[i] == nil:291return true292case groupable.RemoteWrite[j] == nil:293return false294default:295return groupable.RemoteWrite[i].Name < groupable.RemoteWrite[j].Name296}297})298299bb, err := MarshalConfig(&groupable, false)300if err != nil {301return "", err302}303hash := md5.Sum(bb)304return hex.EncodeToString(hash[:]), nil305}306307// groupConfig creates a grouped Config where all fields are copied from308// the first config except for scrape_configs, which are appended together.309func groupConfigs(groupName string, grouped groupedConfigs) (Config, error) {310if len(grouped) == 0 {311return Config{}, fmt.Errorf("no configs")312}313314// Move the map into a slice and sort it by name so this function315// consistently does the same thing.316cfgs := make([]Config, 0, len(grouped))317for _, cfg := range grouped {318cfgs = append(cfgs, cfg)319}320sort.Slice(cfgs, func(i, j int) bool { return cfgs[i].Name < cfgs[j].Name })321322combined, err := cfgs[0].Clone()323if err != nil {324return Config{}, err325}326combined.Name = groupName327combined.ScrapeConfigs = []*config.ScrapeConfig{}328329// Assign all remote_write configs in the group a consistent set of remote_names.330// If the grouped configs are coming from the scraping service, defaults will have331// been applied and the remote names will be prefixed with the old instance config name.332for _, rwc := range combined.RemoteWrite {333// Blank out the existing name before getting the hash so it doesn't take into334// account any existing name.335rwc.Name = ""336337hash, err := getHash(rwc)338if err != nil {339return Config{}, err340}341342rwc.Name = groupName[:6] + "-" + hash[:6]343}344345// Combine all the scrape configs. It's possible that two different ungrouped346// configs had a matching job name, but this will be detected and rejected347// (as it should be) when the underlying Manager eventually validates the348// combined config.349//350// TODO(rfratto): should we prepend job names with the name of the original351// config? (e.g., job_name = "config_name/job_name").352for _, cfg := range cfgs {353combined.ScrapeConfigs = append(combined.ScrapeConfigs, cfg.ScrapeConfigs...)354}355356return combined, nil357}358359360