Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/common/loki/client/multi.go
4096 views
1
package client
2
3
// This code is copied from Promtail. The client package is used to configure
4
// and run the clients that can send log entries to a Loki instance.
5
6
import (
7
"errors"
8
"fmt"
9
"strings"
10
"sync"
11
12
"github.com/go-kit/log"
13
14
"github.com/grafana/agent/component/common/loki"
15
)
16
17
// MultiClient is client pushing to one or more loki instances.
18
type MultiClient struct {
19
clients []Client
20
entries chan loki.Entry
21
wg sync.WaitGroup
22
23
once sync.Once
24
}
25
26
// NewMulti creates a new client
27
func NewMulti(metrics *Metrics, streamLagLabels []string, logger log.Logger, maxStreams int, cfgs ...Config) (Client, error) {
28
var fake struct{}
29
30
if len(cfgs) == 0 {
31
return nil, errors.New("at least one client config should be provided")
32
}
33
clientsCheck := make(map[string]struct{})
34
clients := make([]Client, 0, len(cfgs))
35
for _, cfg := range cfgs {
36
client, err := New(metrics, cfg, streamLagLabels, maxStreams, logger)
37
if err != nil {
38
return nil, err
39
}
40
41
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
42
if _, ok := clientsCheck[client.Name()]; ok {
43
return nil, fmt.Errorf("duplicate client configs are not allowed, found duplicate for URL: %s", cfg.URL)
44
}
45
46
clientsCheck[client.Name()] = fake
47
clients = append(clients, client)
48
}
49
multi := &MultiClient{
50
clients: clients,
51
entries: make(chan loki.Entry),
52
}
53
multi.start()
54
return multi, nil
55
}
56
57
func (m *MultiClient) start() {
58
m.wg.Add(1)
59
go func() {
60
defer m.wg.Done()
61
for e := range m.entries {
62
for _, c := range m.clients {
63
c.Chan() <- e
64
}
65
}
66
}()
67
}
68
69
func (m *MultiClient) Chan() chan<- loki.Entry {
70
return m.entries
71
}
72
73
// Stop implements Client
74
func (m *MultiClient) Stop() {
75
m.once.Do(func() { close(m.entries) })
76
m.wg.Wait()
77
for _, c := range m.clients {
78
c.Stop()
79
}
80
}
81
82
// StopNow implements Client
83
func (m *MultiClient) StopNow() {
84
for _, c := range m.clients {
85
c.StopNow()
86
}
87
}
88
89
func (m *MultiClient) Name() string {
90
var sb strings.Builder
91
sb.WriteString("multi:")
92
for i, c := range m.clients {
93
sb.WriteString(c.Name())
94
if i != len(m.clients)-1 {
95
sb.WriteString(",")
96
}
97
}
98
return sb.String()
99
}
100
101