Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/docker/internal/dockertarget/target.go
4096 views
1
package dockertarget
2
3
// This code is copied from Promtail. The dockertarget package is used to
4
// configure and run the targets that can read logs from Docker containers and
5
// forward them to other loki components.
6
7
import (
8
"bufio"
9
"context"
10
"fmt"
11
"io"
12
"strconv"
13
"strings"
14
"sync"
15
"time"
16
17
docker_types "github.com/docker/docker/api/types"
18
"github.com/docker/docker/client"
19
"github.com/docker/docker/pkg/stdcopy"
20
"github.com/go-kit/log"
21
"github.com/go-kit/log/level"
22
"github.com/grafana/agent/component/common/loki"
23
"github.com/grafana/agent/component/common/loki/positions"
24
"github.com/grafana/loki/pkg/logproto"
25
"github.com/prometheus/common/model"
26
"github.com/prometheus/prometheus/model/labels"
27
"github.com/prometheus/prometheus/model/relabel"
28
"go.uber.org/atomic"
29
)
30
31
const (
32
// See github.com/prometheus/prometheus/discovery/moby
33
dockerLabel = model.MetaLabelPrefix + "docker_"
34
dockerLabelContainerPrefix = dockerLabel + "container_"
35
dockerLabelLogStream = dockerLabelContainerPrefix + "log_stream"
36
)
37
38
// Target enables reading Docker container logs.
39
type Target struct {
40
logger log.Logger
41
handler loki.EntryHandler
42
since int64
43
positions positions.Positions
44
containerName string
45
labels model.LabelSet
46
relabelConfig []*relabel.Config
47
metrics *Metrics
48
49
cancel context.CancelFunc
50
client client.APIClient
51
wg sync.WaitGroup
52
running *atomic.Bool
53
err error
54
}
55
56
// NewTarget starts a new target to read logs from a given container ID.
57
func NewTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, position positions.Positions, containerID string, labels model.LabelSet, relabelConfig []*relabel.Config, client client.APIClient) (*Target, error) {
58
pos, err := position.Get(positions.CursorKey(containerID), labels.String())
59
if err != nil {
60
return nil, err
61
}
62
var since int64
63
if pos != 0 {
64
since = pos
65
}
66
67
t := &Target{
68
logger: logger,
69
handler: handler,
70
since: since,
71
positions: position,
72
containerName: containerID,
73
labels: labels,
74
relabelConfig: relabelConfig,
75
metrics: metrics,
76
77
client: client,
78
running: atomic.NewBool(false),
79
}
80
81
// NOTE (@tpaschalis) The original Promtail implementation would call
82
// t.StartIfNotRunning() right here to start tailing.
83
// We manage targets from a task's Run method.
84
return t, nil
85
}
86
87
func (t *Target) processLoop(ctx context.Context) {
88
t.running.Store(true)
89
defer t.running.Store(false)
90
91
t.wg.Add(1)
92
defer t.wg.Done()
93
94
opts := docker_types.ContainerLogsOptions{
95
ShowStdout: true,
96
ShowStderr: true,
97
Follow: true,
98
Timestamps: true,
99
Since: strconv.FormatInt(t.since, 10),
100
}
101
inspectInfo, err := t.client.ContainerInspect(ctx, t.containerName)
102
if err != nil {
103
level.Error(t.logger).Log("msg", "could not inspect container info", "container", t.containerName, "err", err)
104
t.err = err
105
return
106
}
107
logs, err := t.client.ContainerLogs(ctx, t.containerName, opts)
108
if err != nil {
109
level.Error(t.logger).Log("msg", "could not fetch logs for container", "container", t.containerName, "err", err)
110
t.err = err
111
return
112
}
113
114
// Start transferring
115
rstdout, wstdout := io.Pipe()
116
rstderr, wstderr := io.Pipe()
117
t.wg.Add(1)
118
go func() {
119
defer func() {
120
t.wg.Done()
121
wstdout.Close()
122
wstderr.Close()
123
t.Stop()
124
}()
125
var written int64
126
var err error
127
if inspectInfo.Config.Tty {
128
written, err = io.Copy(wstdout, logs)
129
} else {
130
written, err = stdcopy.StdCopy(wstdout, wstderr, logs)
131
}
132
if err != nil {
133
level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err)
134
} else {
135
level.Info(t.logger).Log("msg", "finished transferring logs", "written", written, "container", t.containerName)
136
}
137
}()
138
139
// Start processing
140
t.wg.Add(2)
141
go t.process(rstdout, "stdout")
142
go t.process(rstderr, "stderr")
143
144
// Wait until done
145
<-ctx.Done()
146
logs.Close()
147
level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerName)
148
}
149
150
// extractTs tries for read the timestamp from the beginning of the log line.
151
// It's expected to follow the format 2006-01-02T15:04:05.999999999Z07:00.
152
func extractTs(line string) (time.Time, string, error) {
153
pair := strings.SplitN(line, " ", 2)
154
if len(pair) != 2 {
155
return time.Now(), line, fmt.Errorf("Could not find timestamp in '%s'", line)
156
}
157
ts, err := time.Parse("2006-01-02T15:04:05.999999999Z07:00", pair[0])
158
if err != nil {
159
return time.Now(), line, fmt.Errorf("Could not parse timestamp from '%s': %w", pair[0], err)
160
}
161
return ts, pair[1], nil
162
}
163
164
// https://devmarkpro.com/working-big-files-golang
165
func readLine(r *bufio.Reader) (string, error) {
166
var (
167
isPrefix = true
168
err error
169
line, ln []byte
170
)
171
172
for isPrefix && err == nil {
173
line, isPrefix, err = r.ReadLine()
174
ln = append(ln, line...)
175
}
176
177
return string(ln), err
178
}
179
180
func (t *Target) process(r io.Reader, logStream string) {
181
defer func() {
182
t.wg.Done()
183
}()
184
185
reader := bufio.NewReader(r)
186
for {
187
line, err := readLine(reader)
188
if err != nil {
189
if err == io.EOF {
190
break
191
}
192
level.Error(t.logger).Log("msg", "error reading docker log line, skipping line", "err", err)
193
t.metrics.dockerErrors.Inc()
194
}
195
196
ts, line, err := extractTs(line)
197
if err != nil {
198
level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err)
199
t.metrics.dockerErrors.Inc()
200
continue
201
}
202
203
// Add all labels from the config, relabel and filter them.
204
lb := labels.NewBuilder(nil)
205
for k, v := range t.labels {
206
lb.Set(string(k), string(v))
207
}
208
lb.Set(dockerLabelLogStream, logStream)
209
processed, _ := relabel.Process(lb.Labels(nil), t.relabelConfig...)
210
211
filtered := make(model.LabelSet)
212
for _, lbl := range processed {
213
if strings.HasPrefix(lbl.Name, "__") {
214
continue
215
}
216
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
217
}
218
219
t.handler.Chan() <- loki.Entry{
220
Labels: filtered,
221
Entry: logproto.Entry{
222
Timestamp: ts,
223
Line: line,
224
},
225
}
226
t.metrics.dockerEntries.Inc()
227
228
// NOTE(@tpaschalis) We don't save the positions entry with the
229
// filtered labels, but with the default label set, as this is the one
230
// used to find the original read offset from the client. This might be
231
// problematic if we have the same container with a different set of
232
// labels (e.g. duplicated and relabeled), but this shouldn't be the
233
// case anyway.
234
t.positions.Put(positions.CursorKey(t.containerName), t.labels.String(), ts.Unix())
235
}
236
}
237
238
// StartIfNotRunning starts processing container logs. The operation is idempotent , i.e. the processing cannot be started twice.
239
func (t *Target) StartIfNotRunning() {
240
if t.running.CompareAndSwap(false, true) {
241
level.Debug(t.logger).Log("msg", "starting process loop", "container", t.containerName)
242
ctx, cancel := context.WithCancel(context.Background())
243
t.cancel = cancel
244
go t.processLoop(ctx)
245
} else {
246
level.Debug(t.logger).Log("msg", "attempted to start process loop but it's already running", "container", t.containerName)
247
}
248
}
249
250
// Stop shuts down the target.
251
func (t *Target) Stop() {
252
t.cancel()
253
t.wg.Wait()
254
level.Debug(t.logger).Log("msg", "stopped Docker target", "container", t.containerName)
255
}
256
257
// Ready reports whether the target is running.
258
func (t *Target) Ready() bool {
259
return t.running.Load()
260
}
261
262
// Labels reports the target's labels.
263
func (t *Target) Labels() model.LabelSet {
264
return t.labels
265
}
266
267
// Name reports the container name.
268
func (t *Target) Name() string {
269
return t.containerName
270
}
271
272
// Hash is used when comparing targets in tasks.
273
func (t *Target) Hash() uint64 {
274
return uint64(t.labels.Fingerprint())
275
}
276
277
func (t *Target) Path() string {
278
return t.containerName
279
}
280
281
// Details returns target-specific details.
282
func (t *Target) Details() interface{} {
283
var errMsg string
284
if t.err != nil {
285
errMsg = t.err.Error()
286
}
287
return map[string]string{
288
"id": t.containerName,
289
"error": errMsg,
290
"position": t.positions.GetString(positions.CursorKey(t.containerName), t.labels.String()),
291
"running": strconv.FormatBool(t.running.Load()),
292
}
293
}
294
295