Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/docker/docker.go
4096 views
1
package docker
2
3
import (
4
"context"
5
"fmt"
6
"os"
7
"path/filepath"
8
"reflect"
9
"sync"
10
"time"
11
12
"github.com/docker/docker/client"
13
"github.com/go-kit/log"
14
"github.com/go-kit/log/level"
15
"github.com/grafana/agent/component"
16
"github.com/grafana/agent/component/common/loki"
17
"github.com/grafana/agent/component/common/loki/positions"
18
flow_relabel "github.com/grafana/agent/component/common/relabel"
19
"github.com/grafana/agent/component/discovery"
20
dt "github.com/grafana/agent/component/loki/source/docker/internal/dockertarget"
21
"github.com/prometheus/common/model"
22
"github.com/prometheus/prometheus/model/relabel"
23
)
24
25
func init() {
26
component.Register(component.Registration{
27
Name: "loki.source.docker",
28
Args: Arguments{},
29
30
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
31
return New(opts, args.(Arguments))
32
},
33
})
34
}
35
36
const (
37
dockerLabel = model.MetaLabelPrefix + "docker_"
38
dockerLabelContainerPrefix = dockerLabel + "container_"
39
dockerLabelContainerID = dockerLabelContainerPrefix + "id"
40
)
41
42
// Arguments holds values which are used to configure the loki.source.docker
43
// component.
44
type Arguments struct {
45
Host string `river:"host,attr"`
46
Targets []discovery.Target `river:"targets,attr"`
47
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
48
Labels map[string]string `river:"labels,attr,optional"`
49
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
50
}
51
52
var (
53
_ component.Component = (*Component)(nil)
54
_ component.DebugComponent = (*Component)(nil)
55
)
56
57
// Component implements the loki.source.file component.
58
type Component struct {
59
opts component.Options
60
metrics *dt.Metrics
61
62
mut sync.RWMutex
63
args Arguments
64
manager *manager
65
lastOptions *options
66
handler loki.LogsReceiver
67
posFile positions.Positions
68
rcs []*relabel.Config
69
defaultLabels model.LabelSet
70
71
receiversMut sync.RWMutex
72
receivers []loki.LogsReceiver
73
}
74
75
// New creates a new loki.source.file component.
76
func New(o component.Options, args Arguments) (*Component, error) {
77
err := os.MkdirAll(o.DataPath, 0750)
78
if err != nil && !os.IsExist(err) {
79
return nil, err
80
}
81
positionsFile, err := positions.New(o.Logger, positions.Config{
82
SyncPeriod: 10 * time.Second,
83
PositionsFile: filepath.Join(o.DataPath, "positions.yml"),
84
IgnoreInvalidYaml: false,
85
ReadOnly: false,
86
})
87
if err != nil {
88
return nil, err
89
}
90
91
c := &Component{
92
opts: o,
93
metrics: dt.NewMetrics(o.Registerer),
94
95
handler: make(loki.LogsReceiver),
96
manager: newManager(o.Logger, nil),
97
receivers: args.ForwardTo,
98
posFile: positionsFile,
99
}
100
101
// Call to Update() to start readers and set receivers once at the start.
102
if err := c.Update(args); err != nil {
103
return nil, err
104
}
105
106
return c, nil
107
}
108
109
// Run implements component.Component.
110
func (c *Component) Run(ctx context.Context) error {
111
defer c.posFile.Stop()
112
113
defer func() {
114
c.mut.Lock()
115
defer c.mut.Unlock()
116
117
// Guard for safety, but it's not possible for Run to be called without
118
// c.tailer being initialized.
119
if c.manager != nil {
120
c.manager.stop()
121
}
122
}()
123
124
for {
125
select {
126
case <-ctx.Done():
127
return nil
128
case entry := <-c.handler:
129
c.receiversMut.RLock()
130
receivers := c.receivers
131
c.receiversMut.RUnlock()
132
for _, receiver := range receivers {
133
receiver <- entry
134
}
135
}
136
}
137
}
138
139
// Update implements component.Component.
140
func (c *Component) Update(args component.Arguments) error {
141
newArgs := args.(Arguments)
142
143
// Update the receivers before anything else, just in case something fails.
144
c.receiversMut.Lock()
145
c.receivers = newArgs.ForwardTo
146
c.receiversMut.Unlock()
147
148
c.mut.Lock()
149
defer c.mut.Unlock()
150
151
managerOpts, err := c.getManagerOptions(newArgs)
152
if err != nil {
153
return err
154
}
155
156
if managerOpts != c.lastOptions {
157
// Options changed; pass it to the tailer.
158
// This will never fail because it only fails if the context gets canceled.
159
_ = c.manager.updateOptions(context.Background(), managerOpts)
160
c.lastOptions = managerOpts
161
}
162
163
defaultLabels := make(model.LabelSet, len(newArgs.Labels))
164
for k, v := range newArgs.Labels {
165
defaultLabels[model.LabelName(k)] = model.LabelValue(v)
166
}
167
c.defaultLabels = defaultLabels
168
169
if newArgs.RelabelRules != nil && len(newArgs.RelabelRules) > 0 {
170
c.rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
171
} else {
172
c.rcs = []*relabel.Config{}
173
}
174
175
// Convert input targets into targets to give to tailer.
176
targets := make([]*dt.Target, 0, len(newArgs.Targets))
177
178
for _, target := range newArgs.Targets {
179
containerID, ok := target[dockerLabelContainerID]
180
if !ok {
181
level.Debug(c.opts.Logger).Log("msg", "docker target did not include container ID label:"+dockerLabelContainerID)
182
continue
183
}
184
185
var labels = make(model.LabelSet)
186
for k, v := range target {
187
labels[model.LabelName(k)] = model.LabelValue(v)
188
}
189
190
tgt, err := dt.NewTarget(
191
c.metrics,
192
log.With(c.opts.Logger, "target", fmt.Sprintf("docker/%s", containerID)),
193
c.manager.opts.handler,
194
c.manager.opts.positions,
195
containerID,
196
labels.Merge(c.defaultLabels),
197
c.rcs,
198
c.manager.opts.client,
199
)
200
if err != nil {
201
return err
202
}
203
targets = append(targets, tgt)
204
205
// This will never fail because it only fails if the context gets canceled.
206
_ = c.manager.syncTargets(context.Background(), targets)
207
}
208
209
c.args = newArgs
210
return nil
211
}
212
213
// getTailerOptions gets tailer options from arguments. If args hasn't changed
214
// from the last call to getTailerOptions, c.lastOptions is returned.
215
// c.lastOptions must be updated by the caller.
216
//
217
// getTailerOptions must only be called when c.mut is held.
218
func (c *Component) getManagerOptions(args Arguments) (*options, error) {
219
if reflect.DeepEqual(c.args.Host, args.Host) && c.lastOptions != nil {
220
return c.lastOptions, nil
221
}
222
223
opts := []client.Opt{
224
client.WithHost(args.Host),
225
client.WithAPIVersionNegotiation(),
226
}
227
client, err := client.NewClientWithOpts(opts...)
228
if err != nil {
229
level.Error(c.opts.Logger).Log("msg", "could not create new Docker client", "err", err)
230
return c.lastOptions, fmt.Errorf("failed to build docker client: %w", err)
231
}
232
233
return &options{
234
client: client,
235
handler: loki.NewEntryHandler(c.handler, func() {}),
236
positions: c.posFile,
237
}, nil
238
}
239
240
// DebugInfo returns information about the status of tailed targets.
241
func (c *Component) DebugInfo() interface{} {
242
var res readerDebugInfo
243
for _, tgt := range c.manager.targets() {
244
details := tgt.Details().(map[string]string)
245
res.TargetsInfo = append(res.TargetsInfo, targetInfo{
246
Labels: tgt.Labels().String(),
247
ID: details["id"],
248
LastError: details["error"],
249
IsRunning: details["running"],
250
ReadOffset: details["position"],
251
})
252
}
253
return res
254
}
255
256
type readerDebugInfo struct {
257
TargetsInfo []targetInfo `river:"targets_info,block"`
258
}
259
260
type targetInfo struct {
261
ID string `river:"id,attr"`
262
LastError string `river:"last_error,attr"`
263
Labels string `river:"labels,attr"`
264
IsRunning string `river:"is_running,attr"`
265
ReadOffset string `river:"read_offset,attr"`
266
}
267
268