Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/tracing/otelcol_client.go
4095 views
1
package tracing
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
8
"github.com/grafana/agent/component/otelcol"
9
"github.com/hashicorp/go-multierror"
10
"go.opentelemetry.io/collector/pdata/pcommon"
11
"go.opentelemetry.io/collector/pdata/ptrace"
12
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
13
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
14
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
15
"go.uber.org/atomic"
16
)
17
18
type client struct {
19
started atomic.Bool
20
21
mut sync.RWMutex
22
writeTo []otelcol.Consumer
23
}
24
25
var _ otlptrace.Client = (*client)(nil)
26
27
func (cli *client) UpdateWriteTo(consumers []otelcol.Consumer) {
28
cli.mut.Lock()
29
defer cli.mut.Unlock()
30
cli.writeTo = consumers
31
}
32
33
func (cli *client) Start(ctx context.Context) error {
34
if !cli.started.CompareAndSwap(false, true) {
35
return fmt.Errorf("already started")
36
}
37
return nil
38
}
39
40
func (cli *client) Stop(ctx context.Context) error {
41
if !cli.started.CompareAndSwap(true, false) {
42
return fmt.Errorf("not running")
43
}
44
return nil
45
}
46
47
func (cli *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
48
if !cli.started.Load() {
49
// Client didn't start (may be a no-op client); ignore traces.
50
return nil
51
}
52
53
payload := protoToCollector(protoSpans)
54
55
cli.mut.RLock()
56
defer cli.mut.RUnlock()
57
58
var errs error
59
60
for _, target := range cli.writeTo {
61
send := payload
62
63
if target.Capabilities().MutatesData {
64
send = ptrace.NewTraces()
65
payload.CopyTo(send)
66
}
67
68
if err := target.ConsumeTraces(ctx, send); err != nil {
69
errs = multierror.Append(errs, err)
70
}
71
}
72
73
return errs
74
}
75
76
// protoToCollector converts OpenTelemetry SDK traces to OpenTelemetry
77
// Collector traces.
78
func protoToCollector(in []*tracepb.ResourceSpans) ptrace.Traces {
79
out := ptrace.NewTraces()
80
81
for _, resourceIn := range in {
82
resourceOut := out.ResourceSpans().AppendEmpty()
83
resourceOut.SetSchemaUrl(resourceIn.GetSchemaUrl())
84
resourceOut.Resource().SetDroppedAttributesCount(resourceIn.GetResource().GetDroppedAttributesCount())
85
copyMap(resourceIn.GetResource().GetAttributes(), resourceOut.Resource().Attributes())
86
87
resourceOut.ScopeSpans().EnsureCapacity(len(resourceIn.GetScopeSpans()))
88
89
for _, scopeIn := range resourceIn.GetScopeSpans() {
90
scopeOut := resourceOut.ScopeSpans().AppendEmpty()
91
scopeOut.SetSchemaUrl(scopeIn.GetSchemaUrl())
92
scopeOut.Scope().SetName(scopeIn.GetScope().GetName())
93
scopeOut.Scope().SetVersion(scopeIn.GetScope().GetVersion())
94
scopeOut.Scope().SetDroppedAttributesCount(scopeIn.GetScope().GetDroppedAttributesCount())
95
copyMap(scopeIn.GetScope().GetAttributes(), scopeOut.Scope().Attributes())
96
97
scopeOut.Spans().EnsureCapacity(len(scopeIn.GetSpans()))
98
99
for _, spanIn := range scopeIn.GetSpans() {
100
spanOut := scopeOut.Spans().AppendEmpty()
101
spanOut.SetName(spanIn.GetName())
102
spanOut.SetKind(convertKind(spanIn.Kind))
103
spanOut.SetTraceID(convertTraceID(spanIn.GetTraceId()))
104
spanOut.SetSpanID(convertSpanID(spanIn.GetSpanId()))
105
spanOut.SetParentSpanID(convertSpanID(spanIn.GetParentSpanId()))
106
spanOut.SetStartTimestamp(convertTimestamp(spanIn.GetStartTimeUnixNano()))
107
spanOut.SetEndTimestamp(convertTimestamp(spanIn.GetEndTimeUnixNano()))
108
109
spanOut.Status().SetCode(convertStatus(spanIn.GetStatus().GetCode()))
110
spanOut.Status().SetMessage(spanIn.GetStatus().GetMessage())
111
112
spanOut.TraceState().FromRaw(spanIn.GetTraceState())
113
114
spanOut.SetDroppedAttributesCount(spanIn.GetDroppedAttributesCount())
115
spanOut.SetDroppedEventsCount(spanIn.GetDroppedEventsCount())
116
spanOut.SetDroppedAttributesCount(spanIn.GetDroppedAttributesCount())
117
spanOut.SetDroppedLinksCount(spanIn.GetDroppedLinksCount())
118
copyMap(spanIn.GetAttributes(), spanOut.Attributes())
119
120
spanOut.Events().EnsureCapacity(len(spanIn.GetEvents()))
121
for _, eventIn := range spanIn.GetEvents() {
122
eventOut := spanOut.Events().AppendEmpty()
123
eventOut.SetName(eventIn.GetName())
124
eventOut.SetTimestamp(convertTimestamp(eventIn.GetTimeUnixNano()))
125
126
eventOut.SetDroppedAttributesCount(eventIn.GetDroppedAttributesCount())
127
copyMap(eventIn.GetAttributes(), eventOut.Attributes())
128
}
129
130
spanOut.Links().EnsureCapacity(len(spanIn.GetLinks()))
131
for _, linkIn := range spanIn.GetLinks() {
132
linkOut := spanOut.Links().AppendEmpty()
133
linkOut.SetTraceID(convertTraceID(linkIn.GetTraceId()))
134
linkOut.SetSpanID(convertSpanID(linkIn.GetSpanId()))
135
136
linkOut.SetDroppedAttributesCount(linkIn.GetDroppedAttributesCount())
137
copyMap(linkIn.GetAttributes(), linkOut.Attributes())
138
}
139
}
140
}
141
}
142
143
return out
144
}
145
146
func copyMap(from []*commonpb.KeyValue, to pcommon.Map) {
147
to.EnsureCapacity(len(from))
148
149
for _, kvp := range from {
150
out := to.PutEmpty(kvp.GetKey())
151
copyValue(kvp.GetValue(), out)
152
}
153
}
154
155
func copyValue(from *commonpb.AnyValue, to pcommon.Value) {
156
switch val := from.GetValue().(type) {
157
case *commonpb.AnyValue_StringValue:
158
to.SetStr(val.StringValue)
159
160
case *commonpb.AnyValue_BoolValue:
161
to.SetBool(val.BoolValue)
162
163
case *commonpb.AnyValue_IntValue:
164
to.SetInt(val.IntValue)
165
166
case *commonpb.AnyValue_DoubleValue:
167
to.SetDouble(val.DoubleValue)
168
169
case *commonpb.AnyValue_ArrayValue:
170
slice := to.SetEmptySlice()
171
slice.EnsureCapacity(len(val.ArrayValue.Values))
172
173
for _, element := range val.ArrayValue.Values {
174
sliceVal := slice.AppendEmpty()
175
copyValue(element, sliceVal)
176
}
177
178
case *commonpb.AnyValue_KvlistValue:
179
targetMap := to.SetEmptyMap()
180
copyMap(val.KvlistValue.GetValues(), targetMap)
181
182
case *commonpb.AnyValue_BytesValue:
183
to.SetEmptyBytes().FromRaw(val.BytesValue)
184
}
185
}
186
187
func convertTimestamp(inUnixNano uint64) pcommon.Timestamp {
188
return pcommon.Timestamp(inUnixNano)
189
}
190
191
func convertKind(in tracepb.Span_SpanKind) ptrace.SpanKind {
192
switch in {
193
case tracepb.Span_SPAN_KIND_UNSPECIFIED:
194
return ptrace.SpanKindUnspecified
195
case tracepb.Span_SPAN_KIND_INTERNAL:
196
return ptrace.SpanKindInternal
197
case tracepb.Span_SPAN_KIND_SERVER:
198
return ptrace.SpanKindServer
199
case tracepb.Span_SPAN_KIND_CLIENT:
200
return ptrace.SpanKindClient
201
case tracepb.Span_SPAN_KIND_PRODUCER:
202
return ptrace.SpanKindProducer
203
case tracepb.Span_SPAN_KIND_CONSUMER:
204
return ptrace.SpanKindConsumer
205
}
206
207
return ptrace.SpanKindUnspecified
208
}
209
210
func convertTraceID(in []byte) pcommon.TraceID {
211
var out pcommon.TraceID
212
copy(out[:], in)
213
return out
214
}
215
216
func convertSpanID(in []byte) pcommon.SpanID {
217
var out pcommon.SpanID
218
copy(out[:], in)
219
return out
220
}
221
222
func convertStatus(in tracepb.Status_StatusCode) ptrace.StatusCode {
223
switch in {
224
case tracepb.Status_STATUS_CODE_UNSET:
225
return ptrace.StatusCodeUnset
226
case tracepb.Status_STATUS_CODE_OK:
227
return ptrace.StatusCodeOk
228
case tracepb.Status_STATUS_CODE_ERROR:
229
return ptrace.StatusCodeError
230
}
231
232
return ptrace.StatusCodeUnset
233
}
234
235