Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/common/loki/client/multi_test.go
4096 views
1
package client
2
3
// This code is copied from Promtail. The client package is used to configure
4
// and run the clients that can send log entries to a Loki instance.
5
6
import (
7
"net/url"
8
"reflect"
9
"testing"
10
"time"
11
12
"github.com/grafana/agent/component/common/loki/client/fake"
13
14
"github.com/go-kit/log"
15
"github.com/grafana/dskit/backoff"
16
"github.com/grafana/dskit/flagext"
17
"github.com/prometheus/client_golang/prometheus"
18
"github.com/prometheus/common/model"
19
"github.com/stretchr/testify/require"
20
21
"github.com/grafana/agent/component/common/loki"
22
23
"github.com/grafana/loki/pkg/logproto"
24
lokiflag "github.com/grafana/loki/pkg/util/flagext"
25
util_log "github.com/grafana/loki/pkg/util/log"
26
)
27
28
var (
29
nilMetrics = NewMetrics(nil, nil)
30
metrics = NewMetrics(prometheus.DefaultRegisterer, nil)
31
)
32
33
func TestNewMulti(t *testing.T) {
34
_, err := NewMulti(nilMetrics, nil, util_log.Logger, 0, []Config{}...)
35
if err == nil {
36
t.Fatal("expected err but got nil")
37
}
38
host1, _ := url.Parse("http://localhost:3100")
39
host2, _ := url.Parse("https://grafana.com")
40
cc1 := Config{
41
BatchSize: 20,
42
BatchWait: 1 * time.Second,
43
URL: flagext.URLValue{URL: host1},
44
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "yaml"}},
45
}
46
cc2 := Config{
47
BatchSize: 10,
48
BatchWait: 1 * time.Second,
49
URL: flagext.URLValue{URL: host2},
50
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"hi": "there"}},
51
}
52
53
clients, err := NewMulti(metrics, nil, util_log.Logger, 0, cc1, cc2)
54
if err != nil {
55
t.Fatalf("expected err: nil got:%v", err)
56
}
57
multi := clients.(*MultiClient)
58
if len(multi.clients) != 2 {
59
t.Fatalf("expected client: 2 got:%d", len(multi.clients))
60
}
61
actualCfg1 := clients.(*MultiClient).clients[0].(*client).cfg
62
// Yaml should overridden the command line so 'order: yaml' should be expected
63
expectedCfg1 := Config{
64
BatchSize: 20,
65
BatchWait: 1 * time.Second,
66
URL: flagext.URLValue{URL: host1},
67
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "yaml"}},
68
}
69
70
if !reflect.DeepEqual(actualCfg1, expectedCfg1) {
71
t.Fatalf("expected cfg: %v got:%v", expectedCfg1, actualCfg1)
72
}
73
}
74
75
func TestNewMulti_BlockDuplicates(t *testing.T) {
76
_, err := NewMulti(nilMetrics, nil, util_log.Logger, 0, []Config{}...)
77
if err == nil {
78
t.Fatal("expected err but got nil")
79
}
80
host1, _ := url.Parse("http://localhost:3100")
81
cc1 := Config{
82
BatchSize: 20,
83
BatchWait: 1 * time.Second,
84
URL: flagext.URLValue{URL: host1},
85
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "yaml"}},
86
}
87
cc1Copy := cc1
88
89
_, err = NewMulti(metrics, nil, util_log.Logger, 0, cc1, cc1Copy)
90
require.Error(t, err, "expected NewMulti to reject duplicate client configs")
91
92
cc1Copy.Name = "copy"
93
clients, err := NewMulti(metrics, nil, util_log.Logger, 0, cc1, cc1Copy)
94
require.NoError(t, err, "expected NewMulti to reject duplicate client configs")
95
96
multi := clients.(*MultiClient)
97
if len(multi.clients) != 2 {
98
t.Fatalf("expected client: 2 got:%d", len(multi.clients))
99
}
100
actualCfg1 := clients.(*MultiClient).clients[0].(*client).cfg
101
// Yaml should overridden the command line so 'order: yaml' should be expected
102
expectedCfg1 := Config{
103
BatchSize: 20,
104
BatchWait: 1 * time.Second,
105
URL: flagext.URLValue{URL: host1},
106
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "yaml"}},
107
}
108
109
if !reflect.DeepEqual(actualCfg1, expectedCfg1) {
110
t.Fatalf("expected cfg: %v got:%v", expectedCfg1, actualCfg1)
111
}
112
}
113
114
func TestMultiClient_Stop(t *testing.T) {
115
var stopped int
116
117
stopping := func() {
118
stopped++
119
}
120
fc := fake.NewClient(stopping)
121
clients := []Client{fc, fc, fc, fc}
122
m := &MultiClient{
123
clients: clients,
124
entries: make(chan loki.Entry),
125
}
126
m.start()
127
m.Stop()
128
129
if stopped != len(clients) {
130
t.Fatal("missing stop call")
131
}
132
}
133
134
func TestMultiClient_Handle(t *testing.T) {
135
f := fake.NewClient(func() {})
136
clients := []Client{f, f, f, f, f, f}
137
m := &MultiClient{
138
clients: clients,
139
entries: make(chan loki.Entry),
140
}
141
m.start()
142
143
m.Chan() <- loki.Entry{Labels: model.LabelSet{"foo": "bar"}, Entry: logproto.Entry{Line: "foo"}}
144
145
m.Stop()
146
147
if len(f.Received()) != len(clients) {
148
t.Fatal("missing handle call")
149
}
150
}
151
152
func TestMultiClient_Handle_Race(t *testing.T) {
153
u := flagext.URLValue{}
154
require.NoError(t, u.Set("http://localhost"))
155
c1, err := New(nilMetrics, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, nil, 0, log.NewNopLogger())
156
require.NoError(t, err)
157
c2, err := New(nilMetrics, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, nil, 0, log.NewNopLogger())
158
require.NoError(t, err)
159
clients := []Client{c1, c2}
160
m := &MultiClient{
161
clients: clients,
162
entries: make(chan loki.Entry),
163
}
164
m.start()
165
166
m.Chan() <- loki.Entry{Labels: model.LabelSet{"foo": "bar", ReservedLabelTenantID: "1"}, Entry: logproto.Entry{Line: "foo"}}
167
168
m.Stop()
169
}
170
171