package write
import (
"context"
"fmt"
"sync"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/common/loki/client"
"github.com/grafana/agent/pkg/build"
)
var streamLagLabels = []string{"filename"}
func init() {
component.Register(component.Registration{
Name: "loki.write",
Args: Arguments{},
Exports: Exports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
client.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
}
type Arguments struct {
Endpoints []EndpointOptions `river:"endpoint,block,optional"`
ExternalLabels map[string]string `river:"external_labels,attr,optional"`
MaxStreams int `river:"max_streams,attr,optional"`
}
type Exports struct {
Receiver loki.LogsReceiver `river:"receiver,attr"`
}
var (
_ component.Component = (*Component)(nil)
)
type Component struct {
opts component.Options
metrics *client.Metrics
mut sync.RWMutex
args Arguments
receiver loki.LogsReceiver
clients []client.Client
}
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
metrics: client.NewMetrics(o.Registerer, streamLagLabels),
}
c.receiver = make(loki.LogsReceiver)
o.OnStateChange(Exports{Receiver: c.receiver})
if err := c.Update(args); err != nil {
return nil, err
}
return c, nil
}
func (c *Component) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case entry := <-c.receiver:
for _, client := range c.clients {
if client != nil {
select {
case <-ctx.Done():
return nil
case client.Chan() <- entry:
}
}
}
}
}
}
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)
c.mut.Lock()
defer c.mut.Unlock()
c.args = newArgs
for _, client := range c.clients {
if client != nil {
client.Stop()
}
}
c.clients = make([]client.Client, len(newArgs.Endpoints))
cfgs := newArgs.convertClientConfigs()
for _, cfg := range cfgs {
client, err := client.New(c.metrics, cfg, streamLagLabels, newArgs.MaxStreams, c.opts.Logger)
if err != nil {
return err
}
c.clients = append(c.clients, client)
}
return nil
}