Path: blob/main/component/otelcol/extension/jaeger_remote_sampling/internal/jaegerremotesampling/extension.go
4096 views
// Copyright The OpenTelemetry Authors1//2// Licensed under the Apache License, Version 2.0 (the "License");3// you may not use this file except in compliance with the License.4// You may obtain a copy of the License at5//6// http://www.apache.org/licenses/LICENSE-2.07//8// Unless required by applicable law or agreed to in writing, software9// distributed under the License is distributed on an "AS IS" BASIS,10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11// See the License for the specific language governing permissions and12// limitations under the License.1314package jaegerremotesampling1516import (17"context"18"fmt"1920grpcStore "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"21"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"22"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"23"go.opentelemetry.io/collector/component"24"go.uber.org/zap"25"google.golang.org/grpc"2627"github.com/grafana/agent/component/otelcol/extension/jaeger_remote_sampling/internal/jaegerremotesampling/internal"28"github.com/grafana/agent/component/otelcol/extension/jaeger_remote_sampling/internal/strategy_store"29)3031var _ component.Extension = (*jrsExtension)(nil)3233type jrsExtension struct {34cfg *Config35telemetry component.TelemetrySettings3637httpServer component.Component38grpcServer component.Component39samplingStore strategystore.StrategyStore4041closers []func() error42}4344func newExtension(cfg *Config, telemetry component.TelemetrySettings) *jrsExtension {45jrse := &jrsExtension{46cfg: cfg,47telemetry: telemetry,48}49return jrse50}5152func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error {53// the config validation will take care of ensuring we have one and only one of the following about the54// source of the sampling config:55// - remote (gRPC)56// - local file57// - contents (string)58// we can then use a simplified logic here to assign the appropriate store59if jrse.cfg.Source.File != "" {60opts := static.Options{61StrategiesFile: jrse.cfg.Source.File,62ReloadInterval: jrse.cfg.Source.ReloadInterval,63}64ss, err := static.NewStrategyStore(opts, jrse.telemetry.Logger)65if err != nil {66return fmt.Errorf("failed to create the local file strategy store: %w", err)67}6869// there's a Close function on the concrete type, which is not visible to us...70// how can we close it then?71jrse.samplingStore = ss72}7374if jrse.cfg.Source.Remote != nil {75opts, err := jrse.cfg.Source.Remote.ToDialOptions(host, jrse.telemetry)76if err != nil {77return fmt.Errorf("error while setting up the remote sampling source: %w", err)78}79conn, err := grpc.Dial(jrse.cfg.Source.Remote.Endpoint, opts...)80if err != nil {81return fmt.Errorf("error while connecting to the remote sampling source: %w", err)82}8384jrse.samplingStore = grpcStore.NewConfigManager(conn)85jrse.closers = append(jrse.closers, func() error {86return conn.Close()87})88}8990if jrse.cfg.Source.Contents != "" {91ss, err := strategy_store.NewStrategyStore(jrse.cfg.Source.Contents, jrse.telemetry.Logger)92if err != nil {93return fmt.Errorf("error while setting up the contents sampling source: %w", err)94}95jrse.samplingStore = ss96}9798if jrse.cfg.HTTPServerSettings != nil {99httpServer, err := internal.NewHTTP(jrse.telemetry, *jrse.cfg.HTTPServerSettings, jrse.samplingStore)100if err != nil {101return fmt.Errorf("error while creating the HTTP server: %w", err)102}103jrse.httpServer = httpServer104// then we start our own server interfaces, starting with the HTTP one105if err := jrse.httpServer.Start(ctx, host); err != nil {106return fmt.Errorf("error while starting the HTTP server: %w", err)107}108}109110if jrse.cfg.GRPCServerSettings != nil {111grpcServer, err := internal.NewGRPC(jrse.telemetry, *jrse.cfg.GRPCServerSettings, jrse.samplingStore)112if err != nil {113return fmt.Errorf("error while creating the gRPC server: %w", err)114}115jrse.grpcServer = grpcServer116// start our gRPC server interface117if err := jrse.grpcServer.Start(ctx, host); err != nil {118return fmt.Errorf("error while starting the gRPC server: %w", err)119}120}121122return nil123}124125func (jrse *jrsExtension) Shutdown(ctx context.Context) error {126// we probably don't want to break whenever an error occurs, we want to continue and close the other resources127if jrse.httpServer != nil {128if err := jrse.httpServer.Shutdown(ctx); err != nil {129jrse.telemetry.Logger.Error("error while shutting down the HTTP server", zap.Error(err))130}131}132133if jrse.grpcServer != nil {134if err := jrse.grpcServer.Shutdown(ctx); err != nil {135jrse.telemetry.Logger.Error("error while shutting down the gRPC server", zap.Error(err))136}137}138139for _, closer := range jrse.closers {140if err := closer(); err != nil {141jrse.telemetry.Logger.Error("error while shutting down the sampling store", zap.Error(err))142}143}144145return nil146}147148149