Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/extension/jaeger_remote_sampling/internal/jaegerremotesampling/extension.go
4096 views
1
// Copyright The OpenTelemetry Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
package jaegerremotesampling
16
17
import (
18
"context"
19
"fmt"
20
21
grpcStore "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
22
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
23
"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
24
"go.opentelemetry.io/collector/component"
25
"go.uber.org/zap"
26
"google.golang.org/grpc"
27
28
"github.com/grafana/agent/component/otelcol/extension/jaeger_remote_sampling/internal/jaegerremotesampling/internal"
29
"github.com/grafana/agent/component/otelcol/extension/jaeger_remote_sampling/internal/strategy_store"
30
)
31
32
var _ component.Extension = (*jrsExtension)(nil)
33
34
type jrsExtension struct {
35
cfg *Config
36
telemetry component.TelemetrySettings
37
38
httpServer component.Component
39
grpcServer component.Component
40
samplingStore strategystore.StrategyStore
41
42
closers []func() error
43
}
44
45
func newExtension(cfg *Config, telemetry component.TelemetrySettings) *jrsExtension {
46
jrse := &jrsExtension{
47
cfg: cfg,
48
telemetry: telemetry,
49
}
50
return jrse
51
}
52
53
func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error {
54
// the config validation will take care of ensuring we have one and only one of the following about the
55
// source of the sampling config:
56
// - remote (gRPC)
57
// - local file
58
// - contents (string)
59
// we can then use a simplified logic here to assign the appropriate store
60
if jrse.cfg.Source.File != "" {
61
opts := static.Options{
62
StrategiesFile: jrse.cfg.Source.File,
63
ReloadInterval: jrse.cfg.Source.ReloadInterval,
64
}
65
ss, err := static.NewStrategyStore(opts, jrse.telemetry.Logger)
66
if err != nil {
67
return fmt.Errorf("failed to create the local file strategy store: %w", err)
68
}
69
70
// there's a Close function on the concrete type, which is not visible to us...
71
// how can we close it then?
72
jrse.samplingStore = ss
73
}
74
75
if jrse.cfg.Source.Remote != nil {
76
opts, err := jrse.cfg.Source.Remote.ToDialOptions(host, jrse.telemetry)
77
if err != nil {
78
return fmt.Errorf("error while setting up the remote sampling source: %w", err)
79
}
80
conn, err := grpc.Dial(jrse.cfg.Source.Remote.Endpoint, opts...)
81
if err != nil {
82
return fmt.Errorf("error while connecting to the remote sampling source: %w", err)
83
}
84
85
jrse.samplingStore = grpcStore.NewConfigManager(conn)
86
jrse.closers = append(jrse.closers, func() error {
87
return conn.Close()
88
})
89
}
90
91
if jrse.cfg.Source.Contents != "" {
92
ss, err := strategy_store.NewStrategyStore(jrse.cfg.Source.Contents, jrse.telemetry.Logger)
93
if err != nil {
94
return fmt.Errorf("error while setting up the contents sampling source: %w", err)
95
}
96
jrse.samplingStore = ss
97
}
98
99
if jrse.cfg.HTTPServerSettings != nil {
100
httpServer, err := internal.NewHTTP(jrse.telemetry, *jrse.cfg.HTTPServerSettings, jrse.samplingStore)
101
if err != nil {
102
return fmt.Errorf("error while creating the HTTP server: %w", err)
103
}
104
jrse.httpServer = httpServer
105
// then we start our own server interfaces, starting with the HTTP one
106
if err := jrse.httpServer.Start(ctx, host); err != nil {
107
return fmt.Errorf("error while starting the HTTP server: %w", err)
108
}
109
}
110
111
if jrse.cfg.GRPCServerSettings != nil {
112
grpcServer, err := internal.NewGRPC(jrse.telemetry, *jrse.cfg.GRPCServerSettings, jrse.samplingStore)
113
if err != nil {
114
return fmt.Errorf("error while creating the gRPC server: %w", err)
115
}
116
jrse.grpcServer = grpcServer
117
// start our gRPC server interface
118
if err := jrse.grpcServer.Start(ctx, host); err != nil {
119
return fmt.Errorf("error while starting the gRPC server: %w", err)
120
}
121
}
122
123
return nil
124
}
125
126
func (jrse *jrsExtension) Shutdown(ctx context.Context) error {
127
// we probably don't want to break whenever an error occurs, we want to continue and close the other resources
128
if jrse.httpServer != nil {
129
if err := jrse.httpServer.Shutdown(ctx); err != nil {
130
jrse.telemetry.Logger.Error("error while shutting down the HTTP server", zap.Error(err))
131
}
132
}
133
134
if jrse.grpcServer != nil {
135
if err := jrse.grpcServer.Shutdown(ctx); err != nil {
136
jrse.telemetry.Logger.Error("error while shutting down the gRPC server", zap.Error(err))
137
}
138
}
139
140
for _, closer := range jrse.closers {
141
if err := closer(); err != nil {
142
jrse.telemetry.Logger.Error("error while shutting down the sampling store", zap.Error(err))
143
}
144
}
145
146
return nil
147
}
148
149