package process
import (
"context"
"reflect"
"sync"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/loki/process/internal/stages"
)
func init() {
component.Register(component.Registration{
Name: "loki.process",
Args: Arguments{},
Exports: Exports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}
type Arguments struct {
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
Stages []stages.StageConfig `river:"stage,enum,optional"`
}
type Exports struct {
Receiver loki.LogsReceiver `river:"receiver,attr"`
}
var (
_ component.Component = (*Component)(nil)
)
type Component struct {
opts component.Options
mut sync.RWMutex
receiver loki.LogsReceiver
fanout []loki.LogsReceiver
processIn chan<- loki.Entry
processOut chan loki.Entry
entryHandler loki.EntryHandler
stages []stages.StageConfig
}
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
}
c.receiver = make(loki.LogsReceiver)
c.processOut = make(loki.LogsReceiver)
o.OnStateChange(Exports{Receiver: c.receiver})
if err := c.Update(args); err != nil {
return nil, err
}
return c, nil
}
func (c *Component) Run(ctx context.Context) error {
defer func() {
c.mut.RLock()
if c.entryHandler != nil {
c.entryHandler.Stop()
}
close(c.processOut)
close(c.processIn)
c.mut.RUnlock()
}()
wg := &sync.WaitGroup{}
wg.Add(2)
go c.handleIn(ctx, wg)
go c.handleOut(ctx, wg)
wg.Wait()
return nil
}
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)
c.mut.Lock()
defer c.mut.Unlock()
if stagesChanged(c.stages, newArgs.Stages) || c.stages == nil {
if c.entryHandler != nil {
c.entryHandler.Stop()
}
pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer)
if err != nil {
return err
}
c.entryHandler = loki.NewEntryHandler(c.processOut, func() {})
c.processIn = pipeline.Wrap(c.entryHandler).Chan()
c.stages = newArgs.Stages
}
c.fanout = newArgs.ForwardTo
return nil
}
func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case entry := <-c.receiver:
c.mut.RLock()
select {
case <-ctx.Done():
return
case c.processIn <- entry:
}
c.mut.RUnlock()
}
}
}
func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case entry := <-c.processOut:
c.mut.RLock()
for _, f := range c.fanout {
select {
case <-ctx.Done():
return
case f <- entry:
}
}
c.mut.RUnlock()
}
}
}
func stagesChanged(prev, next []stages.StageConfig) bool {
if len(prev) != len(next) {
return true
}
for i := range prev {
if !reflect.DeepEqual(prev[i], next[i]) {
return true
}
}
return false
}