Path: blob/main/component/otelcol/internal/lazyconsumer/lazyconsumer.go
4096 views
// Package lazyconsumer implements a lazy OpenTelemetry Collector consumer1// which can lazily forward request to another consumer implementation.2package lazyconsumer34import (5"context"6"sync"78otelcomponent "go.opentelemetry.io/collector/component"9otelconsumer "go.opentelemetry.io/collector/consumer"10"go.opentelemetry.io/collector/pdata/plog"11"go.opentelemetry.io/collector/pdata/pmetric"12"go.opentelemetry.io/collector/pdata/ptrace"13)1415// Consumer is a lazily-loaded consumer.16type Consumer struct {17ctx context.Context1819mut sync.RWMutex20metricsConsumer otelconsumer.Metrics21logsConsumer otelconsumer.Logs22tracesConsumer otelconsumer.Traces23}2425var (26_ otelconsumer.Traces = (*Consumer)(nil)27_ otelconsumer.Metrics = (*Consumer)(nil)28_ otelconsumer.Logs = (*Consumer)(nil)29)3031// New creates a new Consumer. The provided ctx is used to determine when the32// Consumer should stop accepting data; if the ctx is closed, no further data33// will be accepted.34func New(ctx context.Context) *Consumer {35return &Consumer{ctx: ctx}36}3738// Capabilities implements otelconsumer.baseConsumer.39func (c *Consumer) Capabilities() otelconsumer.Capabilities {40return otelconsumer.Capabilities{41// MutatesData is always set to false; the lazy consumer will check the42// underlying consumer's capabilities prior to forwarding data and will43// pass a copy if the underlying consumer mutates data.44MutatesData: false,45}46}4748// ConsumeTraces implements otelconsumer.Traces.49func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {50if c.ctx.Err() != nil {51return c.ctx.Err()52}5354c.mut.RLock()55defer c.mut.RUnlock()5657if c.tracesConsumer == nil {58return otelcomponent.ErrDataTypeIsNotSupported59}6061if c.tracesConsumer.Capabilities().MutatesData {62newTraces := ptrace.NewTraces()63td.CopyTo(newTraces)64td = newTraces65}66return c.tracesConsumer.ConsumeTraces(ctx, td)67}6869// ConsumeMetrics implements otelconsumer.Metrics.70func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {71if c.ctx.Err() != nil {72return c.ctx.Err()73}7475c.mut.RLock()76defer c.mut.RUnlock()7778if c.metricsConsumer == nil {79return otelcomponent.ErrDataTypeIsNotSupported80}8182if c.metricsConsumer.Capabilities().MutatesData {83newMetrics := pmetric.NewMetrics()84md.CopyTo(newMetrics)85md = newMetrics86}87return c.metricsConsumer.ConsumeMetrics(ctx, md)88}8990// ConsumeLogs implements otelconsumer.Logs.91func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {92if c.ctx.Err() != nil {93return c.ctx.Err()94}9596c.mut.RLock()97defer c.mut.RUnlock()9899if c.logsConsumer == nil {100return otelcomponent.ErrDataTypeIsNotSupported101}102103if c.logsConsumer.Capabilities().MutatesData {104newLogs := plog.NewLogs()105ld.CopyTo(newLogs)106ld = newLogs107}108return c.logsConsumer.ConsumeLogs(ctx, ld)109}110111// SetConsumers updates the internal consumers that Consumer will forward data112// to. It is valid for any combination of m, l, and t to be nil.113func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l otelconsumer.Logs) {114c.mut.Lock()115defer c.mut.Unlock()116117c.metricsConsumer = m118c.logsConsumer = l119c.tracesConsumer = t120}121122123