Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/tracing/tracing.go
4094 views
1
// Package tracing implements the tracing subsystem of Grafana Agent Flow. The
2
// tracing subsystem exposes a [trace.TraceProvider] which accepts traces and
3
// forwards them to a running component for further processing.
4
package tracing
5
6
import (
7
"context"
8
"sync"
9
"time"
10
11
"github.com/grafana/agent/component/otelcol"
12
"github.com/grafana/agent/pkg/build"
13
"github.com/grafana/agent/pkg/flow/tracing/internal/jaegerremote"
14
"github.com/grafana/agent/pkg/river"
15
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
16
"go.opentelemetry.io/otel/sdk/resource"
17
tracesdk "go.opentelemetry.io/otel/sdk/trace"
18
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
19
"go.opentelemetry.io/otel/trace"
20
)
21
22
const serviceName = "grafana-agent"
23
24
// Defaults for all Options structs.
25
var (
26
DefaultOptions = Options{
27
SamplingFraction: 0.1, // Keep 10% of spans
28
WriteTo: []otelcol.Consumer{}, // Don't send spans anywhere.
29
}
30
31
DefaultJaegerRemoteSamplerOptions = JaegerRemoteSamplerOptions{
32
URL: "http://127.0.0.1:5778/sampling",
33
MaxOperations: 256,
34
RefreshInterval: time.Minute,
35
}
36
)
37
38
// Options control the tracing subsystem.
39
type Options struct {
40
// SamplingFraction determines which rate of traces to sample. A value of 1
41
// means to keep 100% of traces. A value of 0 means to keep 0% of traces.
42
SamplingFraction float64 `river:"sampling_fraction,attr,optional"`
43
44
// Sampler holds optional samplers to configure on top of the sampling
45
// fraction.
46
Sampler SamplerOptions `river:"sampler,block,optional"`
47
48
// WriteTo holds a set of OpenTelemetry Collector consumers where internal
49
// traces should be sent.
50
WriteTo []otelcol.Consumer `river:"write_to,attr,optional"`
51
}
52
53
type SamplerOptions struct {
54
JaegerRemote *JaegerRemoteSamplerOptions `river:"jaeger_remote,block,optional"`
55
56
// TODO(rfratto): if support for another sampler is added, SamplerOptions
57
// must enforce that only one inner block is provided.
58
}
59
60
type JaegerRemoteSamplerOptions struct {
61
URL string `river:"url,attr,optional"`
62
MaxOperations int `river:"max_operations,attr,optional"`
63
RefreshInterval time.Duration `river:"refresh_interval,attr,optional"`
64
}
65
66
// Implementations to apply defaults and perform validations
67
var (
68
_ river.Unmarshaler = (*Options)(nil)
69
_ river.Unmarshaler = (*JaegerRemoteSamplerOptions)(nil)
70
)
71
72
// UnmarshalRiver implements river.Unmarshaler.
73
func (opts *Options) UnmarshalRiver(f func(interface{}) error) error {
74
*opts = DefaultOptions
75
76
type options Options
77
return f((*options)(opts))
78
}
79
80
// UnmarshalRiver implements river.Unmarshaler.
81
func (opts *JaegerRemoteSamplerOptions) UnmarshalRiver(f func(interface{}) error) error {
82
*opts = DefaultJaegerRemoteSamplerOptions
83
84
type options JaegerRemoteSamplerOptions
85
return f((*options)(opts))
86
}
87
88
// Tracer is the tracing subsystem of Grafana Agent Flow. It implements
89
// [trace.TracerProvider] and can be used to forward internally generated
90
// traces to a OpenTelemetry Collector-compatible Flow component.
91
type Tracer struct {
92
sampler *lazySampler
93
client *client
94
exp *otlptrace.Exporter
95
tp *tracesdk.TracerProvider
96
97
samplerMut sync.Mutex
98
jaegerRemoteSampler *jaegerremote.Sampler // In-use jaeger remote sampler (may be nil).
99
}
100
101
var _ trace.TracerProvider = (*Tracer)(nil)
102
103
// New creates a new tracing subsystem. Call Run to start the tracing
104
// subsystem.
105
func New(cfg Options) (*Tracer, error) {
106
res, err := resource.New(
107
context.Background(),
108
resource.WithSchemaURL(semconv.SchemaURL),
109
resource.WithAttributes(
110
semconv.ServiceNameKey.String(serviceName),
111
semconv.ServiceVersionKey.String(build.Version),
112
),
113
resource.WithProcessRuntimeDescription(),
114
resource.WithTelemetrySDK(),
115
)
116
if err != nil {
117
return nil, err
118
}
119
120
// Create a lazy sampler and pre-seed it with the sampling fraction.
121
var sampler lazySampler
122
sampler.SetSampler(tracesdk.TraceIDRatioBased(cfg.SamplingFraction))
123
124
shimClient := &client{}
125
exp := otlptrace.NewUnstarted(shimClient)
126
127
tp := tracesdk.NewTracerProvider(
128
tracesdk.WithBatcher(exp),
129
tracesdk.WithSampler(tracesdk.ParentBased(&sampler)),
130
tracesdk.WithResource(res),
131
)
132
133
t := &Tracer{
134
sampler: &sampler,
135
client: shimClient,
136
exp: exp,
137
tp: tp,
138
}
139
140
if err := t.Update(cfg); err != nil {
141
return nil, err
142
}
143
return t, nil
144
}
145
146
// Update provides a new config to the tracing subsystem.
147
func (t *Tracer) Update(opts Options) error {
148
t.samplerMut.Lock()
149
defer t.samplerMut.Unlock()
150
151
t.client.UpdateWriteTo(opts.WriteTo)
152
153
// Stop the previous instance of the Jaeger remote sampler if it exists. The
154
// sampler can still make sampling decisions after being closed; it just
155
// won't poll anymore.
156
if t.jaegerRemoteSampler != nil {
157
t.jaegerRemoteSampler.Close()
158
t.jaegerRemoteSampler = nil
159
}
160
161
// Remote samplers accept a "seed" sampler to use before the remote is
162
// available. Get the current sampler from the previous iteration.
163
lastSampler := t.sampler.Sampler()
164
165
switch {
166
case opts.Sampler.JaegerRemote != nil:
167
t.jaegerRemoteSampler = jaegerremote.New(
168
serviceName,
169
jaegerremote.WithSamplingServerURL(opts.Sampler.JaegerRemote.URL),
170
jaegerremote.WithSamplingRefreshInterval(opts.Sampler.JaegerRemote.RefreshInterval),
171
jaegerremote.WithMaxOperations(opts.Sampler.JaegerRemote.MaxOperations),
172
jaegerremote.WithInitialSampler(lastSampler),
173
)
174
175
t.sampler.SetSampler(t.jaegerRemoteSampler)
176
177
default:
178
t.sampler.SetSampler(tracesdk.TraceIDRatioBased(opts.SamplingFraction))
179
}
180
181
return nil
182
}
183
184
// Run starts the tracing subsystem and runs it until the provided context is
185
// canceled. If the tracing subsystem could not be started, an error is
186
// returned.
187
//
188
// Run returns no error upon normal shutdown.
189
func (t *Tracer) Run(ctx context.Context) error {
190
if err := t.exp.Start(ctx); err != nil {
191
return err
192
}
193
194
<-ctx.Done()
195
196
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
197
defer cancel()
198
if err := t.tp.Shutdown(shutdownCtx); err != nil {
199
return err
200
}
201
return nil
202
}
203
204
func (t *Tracer) Tracer(name string, options ...trace.TracerOption) trace.Tracer {
205
return t.tp.Tracer(name, options...)
206
}
207
208