package tracing
import (
"context"
"sync"
"time"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/flow/tracing/internal/jaegerremote"
"github.com/grafana/agent/pkg/river"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
)
const serviceName = "grafana-agent"
var (
DefaultOptions = Options{
SamplingFraction: 0.1,
WriteTo: []otelcol.Consumer{},
}
DefaultJaegerRemoteSamplerOptions = JaegerRemoteSamplerOptions{
URL: "http://127.0.0.1:5778/sampling",
MaxOperations: 256,
RefreshInterval: time.Minute,
}
)
type Options struct {
SamplingFraction float64 `river:"sampling_fraction,attr,optional"`
Sampler SamplerOptions `river:"sampler,block,optional"`
WriteTo []otelcol.Consumer `river:"write_to,attr,optional"`
}
type SamplerOptions struct {
JaegerRemote *JaegerRemoteSamplerOptions `river:"jaeger_remote,block,optional"`
}
type JaegerRemoteSamplerOptions struct {
URL string `river:"url,attr,optional"`
MaxOperations int `river:"max_operations,attr,optional"`
RefreshInterval time.Duration `river:"refresh_interval,attr,optional"`
}
var (
_ river.Unmarshaler = (*Options)(nil)
_ river.Unmarshaler = (*JaegerRemoteSamplerOptions)(nil)
)
func (opts *Options) UnmarshalRiver(f func(interface{}) error) error {
*opts = DefaultOptions
type options Options
return f((*options)(opts))
}
func (opts *JaegerRemoteSamplerOptions) UnmarshalRiver(f func(interface{}) error) error {
*opts = DefaultJaegerRemoteSamplerOptions
type options JaegerRemoteSamplerOptions
return f((*options)(opts))
}
type Tracer struct {
sampler *lazySampler
client *client
exp *otlptrace.Exporter
tp *tracesdk.TracerProvider
samplerMut sync.Mutex
jaegerRemoteSampler *jaegerremote.Sampler
}
var _ trace.TracerProvider = (*Tracer)(nil)
func New(cfg Options) (*Tracer, error) {
res, err := resource.New(
context.Background(),
resource.WithSchemaURL(semconv.SchemaURL),
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(build.Version),
),
resource.WithProcessRuntimeDescription(),
resource.WithTelemetrySDK(),
)
if err != nil {
return nil, err
}
var sampler lazySampler
sampler.SetSampler(tracesdk.TraceIDRatioBased(cfg.SamplingFraction))
shimClient := &client{}
exp := otlptrace.NewUnstarted(shimClient)
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithSampler(tracesdk.ParentBased(&sampler)),
tracesdk.WithResource(res),
)
t := &Tracer{
sampler: &sampler,
client: shimClient,
exp: exp,
tp: tp,
}
if err := t.Update(cfg); err != nil {
return nil, err
}
return t, nil
}
func (t *Tracer) Update(opts Options) error {
t.samplerMut.Lock()
defer t.samplerMut.Unlock()
t.client.UpdateWriteTo(opts.WriteTo)
if t.jaegerRemoteSampler != nil {
t.jaegerRemoteSampler.Close()
t.jaegerRemoteSampler = nil
}
lastSampler := t.sampler.Sampler()
switch {
case opts.Sampler.JaegerRemote != nil:
t.jaegerRemoteSampler = jaegerremote.New(
serviceName,
jaegerremote.WithSamplingServerURL(opts.Sampler.JaegerRemote.URL),
jaegerremote.WithSamplingRefreshInterval(opts.Sampler.JaegerRemote.RefreshInterval),
jaegerremote.WithMaxOperations(opts.Sampler.JaegerRemote.MaxOperations),
jaegerremote.WithInitialSampler(lastSampler),
)
t.sampler.SetSampler(t.jaegerRemoteSampler)
default:
t.sampler.SetSampler(tracesdk.TraceIDRatioBased(opts.SamplingFraction))
}
return nil
}
func (t *Tracer) Run(ctx context.Context) error {
if err := t.exp.Start(ctx); err != nil {
return err
}
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := t.tp.Shutdown(shutdownCtx); err != nil {
return err
}
return nil
}
func (t *Tracer) Tracer(name string, options ...trace.TracerOption) trace.Tracer {
return t.tp.Tracer(name, options...)
}