Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/exporter/prometheus/internal/convert/cache.go
4100 views
1
package convert
2
3
import (
4
"sync"
5
"time"
6
7
"github.com/prometheus/common/model"
8
"github.com/prometheus/prometheus/model/labels"
9
"github.com/prometheus/prometheus/model/metadata"
10
"github.com/prometheus/prometheus/model/timestamp"
11
"github.com/prometheus/prometheus/storage"
12
)
13
14
// memorySeries is an in-memory series mapped from an OpenTelemetry Collector
15
// data point.
16
type memorySeries struct {
17
// We shouldn't need an RWMutex here because there should only ever be
18
// exactly one goroutine for each memory series, since each series is
19
// intended to be unique.
20
sync.Mutex
21
22
labels labels.Labels // Labels used for writing.
23
metadata map[string]string // Extra (optional) metadata used for conversion.
24
25
id storage.SeriesRef // id returned by storage.Appender.
26
27
timestamp time.Time // Timestamp used for out-of-order detection.
28
lastSeen time.Time // Timestamp used for garbage collection.
29
30
value float64 // Value used for writing.
31
}
32
33
func newMemorySeries(metadata map[string]string, labels labels.Labels) *memorySeries {
34
return &memorySeries{
35
metadata: metadata,
36
labels: labels,
37
}
38
}
39
40
// Metadata returns a metadata value by key.
41
func (series *memorySeries) Metadata(key string) string {
42
if series.metadata == nil {
43
return ""
44
}
45
return series.metadata[key]
46
}
47
48
// Timestamp returns the current timestamp of this series.
49
func (series *memorySeries) Timestamp() time.Time {
50
series.Lock()
51
defer series.Unlock()
52
return series.timestamp
53
}
54
55
// SetTimestamp updates the current timestamp of this series.
56
func (series *memorySeries) SetTimestamp(newTime time.Time) {
57
// TODO(rfratto): does this need to be a CAS-style function instead?
58
series.Lock()
59
defer series.Unlock()
60
series.timestamp = newTime
61
}
62
63
// LastSeen returns the timestamp when this series was last seen.
64
func (series *memorySeries) LastSeen() time.Time {
65
series.Lock()
66
defer series.Unlock()
67
return series.lastSeen
68
}
69
70
// Ping updates the last seen timestamp of this series.
71
func (series *memorySeries) Ping() {
72
series.Lock()
73
defer series.Unlock()
74
series.lastSeen = time.Now()
75
}
76
77
// Value gets the current value of this series.
78
func (series *memorySeries) Value() float64 {
79
series.Lock()
80
defer series.Unlock()
81
return series.value
82
}
83
84
// SetValue updates the current value of this series.
85
func (series *memorySeries) SetValue(newValue float64) {
86
// TODO(rfratto): does this need to be a CAS-style function instead?
87
series.Lock()
88
defer series.Unlock()
89
series.value = newValue
90
}
91
92
func (series *memorySeries) WriteTo(app storage.Appender, ts time.Time) error {
93
series.Lock()
94
defer series.Unlock()
95
96
newID, err := app.Append(series.id, series.labels, timestamp.FromTime(ts), series.value)
97
if err != nil {
98
return err
99
}
100
101
if newID != series.id {
102
series.id = newID
103
}
104
return nil
105
}
106
107
type memoryMetadata struct {
108
sync.Mutex
109
110
// ID returned by the underlying storage.Appender.
111
ID storage.SeriesRef
112
Name string
113
114
lastSeen time.Time
115
metadata metadata.Metadata
116
117
// Used for determining when a write needs to occur.
118
lastWrite, lastUpdate time.Time
119
}
120
121
// WriteTo writes the metadata to app if the metadata has changed since the
122
// last update, otherwise WriteTo is a no-op.
123
func (md *memoryMetadata) WriteTo(app storage.Appender, ts time.Time) error {
124
md.Lock()
125
defer md.Unlock()
126
127
if !md.lastWrite.Before(md.lastUpdate) {
128
return nil
129
}
130
131
labels := labels.FromStrings(model.MetricNameLabel, md.Name)
132
133
ref, err := app.UpdateMetadata(md.ID, labels, md.metadata)
134
if err != nil {
135
return err
136
}
137
if ref != md.ID {
138
md.ID = ref
139
}
140
141
md.lastWrite = md.lastUpdate
142
return nil
143
}
144
145
// Update updates the metadata used by md. The next call to WriteTo will write
146
// the new metadata only if m is different from the last metadata stored.
147
func (md *memoryMetadata) Update(m metadata.Metadata) {
148
md.Lock()
149
defer md.Unlock()
150
151
md.lastSeen = time.Now()
152
153
// Metadata hasn't changed; don't do anything.
154
if m == md.metadata {
155
return
156
}
157
158
md.metadata = m
159
md.lastUpdate = time.Now()
160
}
161
162