Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/logs/logs.go
4095 views
1
// Package logs implements logs support for the Grafana Agent.
2
package logs
3
4
import (
5
"fmt"
6
"os"
7
"path/filepath"
8
"sync"
9
"time"
10
_ "time/tzdata" // embed timezone data
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/grafana/agent/pkg/util"
15
"github.com/grafana/loki/clients/pkg/promtail"
16
"github.com/grafana/loki/clients/pkg/promtail/api"
17
"github.com/grafana/loki/clients/pkg/promtail/client"
18
"github.com/grafana/loki/clients/pkg/promtail/config"
19
"github.com/grafana/loki/clients/pkg/promtail/server"
20
"github.com/grafana/loki/clients/pkg/promtail/targets/file"
21
"github.com/grafana/loki/clients/pkg/promtail/wal"
22
"github.com/grafana/loki/pkg/tracing"
23
"github.com/prometheus/client_golang/prometheus"
24
"github.com/prometheus/common/version"
25
)
26
27
func init() {
28
client.UserAgent = fmt.Sprintf("GrafanaAgent/%s", version.Version)
29
}
30
31
// Logs is a Logs log collection. It uses multiple distinct sets of Logs
32
// Promtail agents to collect logs and send them to a Logs server.
33
type Logs struct {
34
mut sync.Mutex
35
36
reg prometheus.Registerer
37
l log.Logger
38
instances map[string]*Instance
39
}
40
41
// New creates and starts Loki log collection.
42
func New(reg prometheus.Registerer, c *Config, l log.Logger, dryRun bool) (*Logs, error) {
43
logs := &Logs{
44
instances: make(map[string]*Instance),
45
reg: reg,
46
l: log.With(l, "component", "logs"),
47
}
48
if err := logs.ApplyConfig(c, dryRun); err != nil {
49
return nil, err
50
}
51
return logs, nil
52
}
53
54
// ApplyConfig updates Logs with a new Config.
55
func (l *Logs) ApplyConfig(c *Config, dryRun bool) error {
56
l.mut.Lock()
57
defer l.mut.Unlock()
58
59
if c == nil {
60
c = &Config{}
61
}
62
63
newInstances := make(map[string]*Instance, len(c.Configs))
64
65
for _, ic := range c.Configs {
66
// If an old instance existed, update it and move it to the new map.
67
if old, ok := l.instances[ic.Name]; ok {
68
err := old.ApplyConfig(ic, c.Global, dryRun)
69
if err != nil {
70
return err
71
}
72
73
newInstances[ic.Name] = old
74
continue
75
}
76
77
inst, err := NewInstance(l.reg, ic, c.Global, l.l, dryRun)
78
if err != nil {
79
return fmt.Errorf("unable to apply config for %s: %w", ic.Name, err)
80
}
81
newInstances[ic.Name] = inst
82
}
83
84
// Any promtail in l.instances that isn't in newInstances has been removed
85
// from the config. Stop them before replacing the map.
86
for key, i := range l.instances {
87
if _, exist := newInstances[key]; exist {
88
continue
89
}
90
i.Stop()
91
}
92
l.instances = newInstances
93
94
return nil
95
}
96
97
// Stop stops the log collector.
98
func (l *Logs) Stop() {
99
l.mut.Lock()
100
defer l.mut.Unlock()
101
102
for _, i := range l.instances {
103
i.Stop()
104
}
105
}
106
107
// Instance is used to retrieve a named Logs instance
108
func (l *Logs) Instance(name string) *Instance {
109
l.mut.Lock()
110
defer l.mut.Unlock()
111
112
return l.instances[name]
113
}
114
115
// Instance is an individual Logs instance.
116
type Instance struct {
117
mut sync.Mutex
118
119
cfg *InstanceConfig
120
log log.Logger
121
reg *util.Unregisterer
122
123
promtail *promtail.Promtail
124
}
125
126
// NewInstance creates and starts a Logs instance.
127
func NewInstance(reg prometheus.Registerer, c *InstanceConfig, g GlobalConfig, l log.Logger, dryRun bool) (*Instance, error) {
128
instReg := prometheus.WrapRegistererWith(prometheus.Labels{"logs_config": c.Name}, reg)
129
130
inst := Instance{
131
reg: util.WrapWithUnregisterer(instReg),
132
log: log.With(l, "logs_config", c.Name),
133
}
134
if err := inst.ApplyConfig(c, g, dryRun); err != nil {
135
return nil, err
136
}
137
return &inst, nil
138
}
139
140
// ApplyConfig will apply a new InstanceConfig. If the config hasn't changed,
141
// then nothing will happen, otherwise the old Promtail will be stopped and
142
// then replaced with a new one.
143
func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) error {
144
i.mut.Lock()
145
defer i.mut.Unlock()
146
147
// No-op if the configs haven't changed.
148
if util.CompareYAML(c, i.cfg) {
149
level.Debug(i.log).Log("msg", "instance config hasn't changed, not recreating Promtail")
150
return nil
151
}
152
i.cfg = c
153
154
positionsDir := filepath.Dir(c.PositionsConfig.PositionsFile)
155
err := os.MkdirAll(positionsDir, 0775)
156
if err != nil {
157
level.Warn(i.log).Log("msg", "failed to create the positions directory. logs may be unable to save their position", "path", positionsDir, "err", err)
158
}
159
160
if i.promtail != nil {
161
i.promtail.Shutdown()
162
i.promtail = nil
163
}
164
165
// Unregister all existing metrics before trying to create a new instance.
166
if !i.reg.UnregisterAll() {
167
// If UnregisterAll fails, we need to abort, otherwise the new promtail
168
// would try to re-register an existing metric and might panic.
169
return fmt.Errorf("failed to unregister all metrics from previous promtail. THIS IS A BUG")
170
}
171
172
if len(c.ClientConfigs) == 0 {
173
level.Debug(i.log).Log("msg", "skipping creation of a promtail because no client_configs are present")
174
return nil
175
}
176
177
clientMetrics := client.NewMetrics(i.reg)
178
p, err := promtail.New(config.Config{
179
Global: config.GlobalConfig{FileWatch: file.WatchConfig{
180
MinPollFrequency: g.FileWatch.MinPollFrequency,
181
MaxPollFrequency: g.FileWatch.MaxPollFrequency,
182
}},
183
ServerConfig: server.Config{Disable: true},
184
ClientConfigs: c.ClientConfigs,
185
PositionsConfig: c.PositionsConfig,
186
ScrapeConfig: c.ScrapeConfig,
187
TargetConfig: c.TargetConfig,
188
LimitsConfig: c.LimitsConfig,
189
Tracing: tracing.Config{Enabled: false},
190
WAL: wal.Config{Enabled: false},
191
}, nil, clientMetrics, dryRun, promtail.WithLogger(i.log), promtail.WithRegisterer(i.reg))
192
if err != nil {
193
return fmt.Errorf("unable to create logs instance: %w", err)
194
}
195
196
i.promtail = p
197
return nil
198
}
199
200
// SendEntry passes an entry to the internal promtail client and returns true if successfully sent. It is
201
// best effort and not guaranteed to succeed.
202
func (i *Instance) SendEntry(entry api.Entry, dur time.Duration) bool {
203
i.mut.Lock()
204
defer i.mut.Unlock()
205
206
// promtail is nil it has been stopped
207
if i.promtail != nil {
208
// send non blocking so we don't block the mutex. this is best effort
209
select {
210
case i.promtail.Client().Chan() <- entry:
211
return true
212
case <-time.After(dur):
213
}
214
}
215
216
return false
217
}
218
219
// Stop stops the Promtail instance.
220
func (i *Instance) Stop() {
221
i.mut.Lock()
222
defer i.mut.Unlock()
223
224
if i.promtail != nil {
225
i.promtail.Shutdown()
226
i.promtail = nil
227
}
228
i.reg.UnregisterAll()
229
}
230
231