package instance
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
)
var (
instanceAbnormalExits = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "agent_metrics_instance_abnormal_exits_total",
Help: "Total number of times a Prometheus instance exited unexpectedly, causing it to be restarted.",
}, []string{"instance_name"})
currentActiveInstances = promauto.NewGauge(prometheus.GaugeOpts{
Name: "agent_metrics_active_instances",
Help: "Current number of active instances being used by the agent.",
})
DefaultBasicManagerConfig = BasicManagerConfig{
InstanceRestartBackoff: 5 * time.Second,
}
)
type Manager interface {
GetInstance(name string) (ManagedInstance, error)
ListInstances() map[string]ManagedInstance
ListConfigs() map[string]Config
ApplyConfig(Config) error
DeleteConfig(name string) error
Stop()
}
type ManagedInstance interface {
Run(ctx context.Context) error
Ready() bool
Update(c Config) error
TargetsActive() map[string][]*scrape.Target
StorageDirectory() string
Appender(ctx context.Context) storage.Appender
}
type BasicManagerConfig struct {
InstanceRestartBackoff time.Duration
}
type BasicManager struct {
cfgMut sync.Mutex
cfg BasicManagerConfig
logger log.Logger
mut sync.Mutex
processes map[string]*managedProcess
launch Factory
}
type managedProcess struct {
cfg Config
inst ManagedInstance
cancel context.CancelFunc
done chan bool
}
func (p managedProcess) Stop() {
p.cancel()
<-p.done
}
type Factory func(c Config) (ManagedInstance, error)
func NewBasicManager(cfg BasicManagerConfig, logger log.Logger, launch Factory) *BasicManager {
return &BasicManager{
cfg: cfg,
logger: logger,
processes: make(map[string]*managedProcess),
launch: launch,
}
}
func (m *BasicManager) UpdateManagerConfig(c BasicManagerConfig) {
m.cfgMut.Lock()
defer m.cfgMut.Unlock()
m.cfg = c
}
func (m *BasicManager) GetInstance(name string) (ManagedInstance, error) {
m.mut.Lock()
defer m.mut.Unlock()
process, ok := m.processes[name]
if !ok {
return nil, fmt.Errorf("instance %s does not exist", name)
}
return process.inst, nil
}
func (m *BasicManager) ListInstances() map[string]ManagedInstance {
m.mut.Lock()
defer m.mut.Unlock()
res := make(map[string]ManagedInstance, len(m.processes))
for name, process := range m.processes {
if process == nil {
continue
}
res[name] = process.inst
}
return res
}
func (m *BasicManager) ListConfigs() map[string]Config {
m.mut.Lock()
defer m.mut.Unlock()
res := make(map[string]Config, len(m.processes))
for name, process := range m.processes {
res[name] = process.cfg
}
return res
}
func (m *BasicManager) ApplyConfig(c Config) error {
m.mut.Lock()
defer m.mut.Unlock()
proc, ok := m.processes[c.Name]
if ok {
err := proc.inst.Update(c)
if errors.Is(err, ErrInvalidUpdate{}) {
level.Info(m.logger).Log("msg", "could not dynamically update instance, will manually restart", "instance", c.Name, "reason", err)
proc.Stop()
} else if err != nil {
return fmt.Errorf("failed to update instance %s: %w", c.Name, err)
} else {
level.Info(m.logger).Log("msg", "dynamically updated instance", "instance", c.Name)
proc.cfg = c
return nil
}
}
err := m.spawnProcess(c)
if err != nil {
return err
}
currentActiveInstances.Inc()
return nil
}
func (m *BasicManager) spawnProcess(c Config) error {
inst, err := m.launch(c)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan bool)
proc := &managedProcess{
cancel: cancel,
done: done,
cfg: c,
inst: inst,
}
m.processes[c.Name] = proc
go func() {
m.runProcess(ctx, c.Name, inst)
close(done)
m.mut.Lock()
if storedProc, exist := m.processes[c.Name]; exist && storedProc.inst == inst {
delete(m.processes, c.Name)
}
m.mut.Unlock()
currentActiveInstances.Dec()
}()
return nil
}
func (m *BasicManager) runProcess(ctx context.Context, name string, inst ManagedInstance) {
for {
err := inst.Run(ctx)
if err != nil && err != context.Canceled {
backoff := m.instanceRestartBackoff()
instanceAbnormalExits.WithLabelValues(name).Inc()
level.Error(m.logger).Log("msg", "instance stopped abnormally, restarting after backoff period", "err", err, "backoff", backoff, "instance", name)
time.Sleep(backoff)
} else {
level.Info(m.logger).Log("msg", "stopped instance", "instance", name)
break
}
}
}
func (m *BasicManager) instanceRestartBackoff() time.Duration {
m.cfgMut.Lock()
defer m.cfgMut.Unlock()
return m.cfg.InstanceRestartBackoff
}
func (m *BasicManager) DeleteConfig(name string) error {
m.mut.Lock()
proc, ok := m.processes[name]
if !ok {
m.mut.Unlock()
return errors.New("config does not exist")
}
m.mut.Unlock()
proc.Stop()
return nil
}
func (m *BasicManager) Stop() {
var wg sync.WaitGroup
m.mut.Lock()
wg.Add(len(m.processes))
for _, proc := range m.processes {
go func(proc *managedProcess) {
proc.Stop()
wg.Done()
}(proc)
}
m.mut.Unlock()
wg.Wait()
}
type MockManager struct {
GetInstanceFunc func(name string) (ManagedInstance, error)
ListInstancesFunc func() map[string]ManagedInstance
ListConfigsFunc func() map[string]Config
ApplyConfigFunc func(Config) error
DeleteConfigFunc func(name string) error
StopFunc func()
}
func (m MockManager) GetInstance(name string) (ManagedInstance, error) {
if m.GetInstanceFunc != nil {
return m.GetInstanceFunc(name)
}
panic("GetInstanceFunc not implemented")
}
func (m MockManager) ListInstances() map[string]ManagedInstance {
if m.ListInstancesFunc != nil {
return m.ListInstancesFunc()
}
panic("ListInstancesFunc not implemented")
}
func (m MockManager) ListConfigs() map[string]Config {
if m.ListConfigsFunc != nil {
return m.ListConfigsFunc()
}
panic("ListConfigsFunc not implemented")
}
func (m MockManager) ApplyConfig(c Config) error {
if m.ApplyConfigFunc != nil {
return m.ApplyConfigFunc(c)
}
panic("ApplyConfigFunc not implemented")
}
func (m MockManager) DeleteConfig(name string) error {
if m.DeleteConfigFunc != nil {
return m.DeleteConfigFunc(name)
}
panic("DeleteConfigFunc not implemented")
}
func (m MockManager) Stop() {
if m.StopFunc != nil {
m.StopFunc()
return
}
panic("StopFunc not implemented")
}