package logs
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
_ "time/tzdata"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/loki/clients/pkg/promtail"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/client"
"github.com/grafana/loki/clients/pkg/promtail/config"
"github.com/grafana/loki/clients/pkg/promtail/server"
"github.com/grafana/loki/clients/pkg/promtail/targets/file"
"github.com/grafana/loki/clients/pkg/promtail/wal"
"github.com/grafana/loki/pkg/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
)
func init() {
client.UserAgent = fmt.Sprintf("GrafanaAgent/%s", version.Version)
}
type Logs struct {
mut sync.Mutex
reg prometheus.Registerer
l log.Logger
instances map[string]*Instance
}
func New(reg prometheus.Registerer, c *Config, l log.Logger, dryRun bool) (*Logs, error) {
logs := &Logs{
instances: make(map[string]*Instance),
reg: reg,
l: log.With(l, "component", "logs"),
}
if err := logs.ApplyConfig(c, dryRun); err != nil {
return nil, err
}
return logs, nil
}
func (l *Logs) ApplyConfig(c *Config, dryRun bool) error {
l.mut.Lock()
defer l.mut.Unlock()
if c == nil {
c = &Config{}
}
newInstances := make(map[string]*Instance, len(c.Configs))
for _, ic := range c.Configs {
if old, ok := l.instances[ic.Name]; ok {
err := old.ApplyConfig(ic, c.Global, dryRun)
if err != nil {
return err
}
newInstances[ic.Name] = old
continue
}
inst, err := NewInstance(l.reg, ic, c.Global, l.l, dryRun)
if err != nil {
return fmt.Errorf("unable to apply config for %s: %w", ic.Name, err)
}
newInstances[ic.Name] = inst
}
for key, i := range l.instances {
if _, exist := newInstances[key]; exist {
continue
}
i.Stop()
}
l.instances = newInstances
return nil
}
func (l *Logs) Stop() {
l.mut.Lock()
defer l.mut.Unlock()
for _, i := range l.instances {
i.Stop()
}
}
func (l *Logs) Instance(name string) *Instance {
l.mut.Lock()
defer l.mut.Unlock()
return l.instances[name]
}
type Instance struct {
mut sync.Mutex
cfg *InstanceConfig
log log.Logger
reg *util.Unregisterer
promtail *promtail.Promtail
}
func NewInstance(reg prometheus.Registerer, c *InstanceConfig, g GlobalConfig, l log.Logger, dryRun bool) (*Instance, error) {
instReg := prometheus.WrapRegistererWith(prometheus.Labels{"logs_config": c.Name}, reg)
inst := Instance{
reg: util.WrapWithUnregisterer(instReg),
log: log.With(l, "logs_config", c.Name),
}
if err := inst.ApplyConfig(c, g, dryRun); err != nil {
return nil, err
}
return &inst, nil
}
func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) error {
i.mut.Lock()
defer i.mut.Unlock()
if util.CompareYAML(c, i.cfg) {
level.Debug(i.log).Log("msg", "instance config hasn't changed, not recreating Promtail")
return nil
}
i.cfg = c
positionsDir := filepath.Dir(c.PositionsConfig.PositionsFile)
err := os.MkdirAll(positionsDir, 0775)
if err != nil {
level.Warn(i.log).Log("msg", "failed to create the positions directory. logs may be unable to save their position", "path", positionsDir, "err", err)
}
if i.promtail != nil {
i.promtail.Shutdown()
i.promtail = nil
}
if !i.reg.UnregisterAll() {
return fmt.Errorf("failed to unregister all metrics from previous promtail. THIS IS A BUG")
}
if len(c.ClientConfigs) == 0 {
level.Debug(i.log).Log("msg", "skipping creation of a promtail because no client_configs are present")
return nil
}
clientMetrics := client.NewMetrics(i.reg)
p, err := promtail.New(config.Config{
Global: config.GlobalConfig{FileWatch: file.WatchConfig{
MinPollFrequency: g.FileWatch.MinPollFrequency,
MaxPollFrequency: g.FileWatch.MaxPollFrequency,
}},
ServerConfig: server.Config{Disable: true},
ClientConfigs: c.ClientConfigs,
PositionsConfig: c.PositionsConfig,
ScrapeConfig: c.ScrapeConfig,
TargetConfig: c.TargetConfig,
LimitsConfig: c.LimitsConfig,
Tracing: tracing.Config{Enabled: false},
WAL: wal.Config{Enabled: false},
}, nil, clientMetrics, dryRun, promtail.WithLogger(i.log), promtail.WithRegisterer(i.reg))
if err != nil {
return fmt.Errorf("unable to create logs instance: %w", err)
}
i.promtail = p
return nil
}
func (i *Instance) SendEntry(entry api.Entry, dur time.Duration) bool {
i.mut.Lock()
defer i.mut.Unlock()
if i.promtail != nil {
select {
case i.promtail.Client().Chan() <- entry:
return true
case <-time.After(dur):
}
}
return false
}
func (i *Instance) Stop() {
i.mut.Lock()
defer i.mut.Unlock()
if i.promtail != nil {
i.promtail.Shutdown()
i.promtail = nil
}
i.reg.UnregisterAll()
}