package wal
import (
"sync"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
)
type memSeries struct {
sync.Mutex
ref chunks.HeadSeriesRef
lset labels.Labels
lastTs int64
}
func (m *memSeries) updateTimestamp(newTs int64) bool {
m.Lock()
defer m.Unlock()
if newTs >= m.lastTs {
m.lastTs = newTs
return true
}
return false
}
type seriesHashmap map[uint64][]*memSeries
func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries {
for _, s := range m[hash] {
if labels.Equal(s.lset, lset) {
return s
}
}
return nil
}
func (m seriesHashmap) Set(hash uint64, s *memSeries) {
seriesSet := m[hash]
for i, prev := range seriesSet {
if labels.Equal(prev.lset, s.lset) {
seriesSet[i] = s
return
}
}
m[hash] = append(seriesSet, s)
}
func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) {
var rem []*memSeries
for _, s := range m[hash] {
if s.ref != ref {
rem = append(rem, s)
}
}
if len(rem) == 0 {
delete(m, hash)
} else {
m[hash] = rem
}
}
type stripeSeries struct {
size int
series []map[chunks.HeadSeriesRef]*memSeries
hashes []seriesHashmap
exemplars []map[chunks.HeadSeriesRef]*exemplar.Exemplar
locks []stripeLock
gcMut sync.Mutex
}
type stripeLock struct {
sync.RWMutex
_ [40]byte
}
func newStripeSeries(stripeSize int) *stripeSeries {
s := &stripeSeries{
size: stripeSize,
series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize),
hashes: make([]seriesHashmap, stripeSize),
exemplars: make([]map[chunks.HeadSeriesRef]*exemplar.Exemplar, stripeSize),
locks: make([]stripeLock, stripeSize),
}
for i := range s.series {
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{}
}
for i := range s.hashes {
s.hashes[i] = seriesHashmap{}
}
for i := range s.exemplars {
s.exemplars[i] = map[chunks.HeadSeriesRef]*exemplar.Exemplar{}
}
return s
}
func (s *stripeSeries) gc(mint int64) map[chunks.HeadSeriesRef]struct{} {
s.gcMut.Lock()
defer s.gcMut.Unlock()
deleted := map[chunks.HeadSeriesRef]struct{}{}
for hashLock := 0; hashLock < s.size; hashLock++ {
s.locks[hashLock].Lock()
for hash, all := range s.hashes[hashLock] {
for _, series := range all {
series.Lock()
if series.lastTs >= mint {
series.Unlock()
continue
}
refLock := int(series.ref) & (s.size - 1)
if hashLock != refLock {
s.locks[refLock].Lock()
}
deleted[series.ref] = struct{}{}
delete(s.series[refLock], series.ref)
s.hashes[hashLock].Delete(hash, series.ref)
delete(s.exemplars[refLock], series.ref)
if hashLock != refLock {
s.locks[refLock].Unlock()
}
series.Unlock()
}
}
s.locks[hashLock].Unlock()
}
return deleted
}
func (s *stripeSeries) GetByID(id chunks.HeadSeriesRef) *memSeries {
refLock := uint64(id) & uint64(s.size-1)
s.locks[refLock].RLock()
defer s.locks[refLock].RUnlock()
return s.series[refLock][id]
}
func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
hashLock := hash & uint64(s.size-1)
s.locks[hashLock].RLock()
defer s.locks[hashLock].RUnlock()
return s.hashes[hashLock].Get(hash, lset)
}
func (s *stripeSeries) Set(hash uint64, series *memSeries) {
var (
hashLock = hash & uint64(s.size-1)
refLock = uint64(series.ref) & uint64(s.size-1)
)
s.locks[refLock].Lock()
s.series[refLock][series.ref] = series
s.locks[refLock].Unlock()
s.locks[hashLock].Lock()
s.hashes[hashLock].Set(hash, series)
s.locks[hashLock].Unlock()
}
func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar {
i := uint64(ref) & uint64(s.size-1)
s.locks[i].RLock()
exemplar := s.exemplars[i][ref]
s.locks[i].RUnlock()
return exemplar
}
func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) {
i := uint64(ref) & uint64(s.size-1)
s.locks[i].Lock()
if s.series[i][ref] != nil {
s.exemplars[i][ref] = exemplar
}
s.locks[i].Unlock()
}
func (s *stripeSeries) iterator() *stripeSeriesIterator {
return &stripeSeriesIterator{s}
}
type stripeSeriesIterator struct {
s *stripeSeries
}
func (it *stripeSeriesIterator) Channel() <-chan *memSeries {
ret := make(chan *memSeries)
go func() {
for i := 0; i < it.s.size; i++ {
it.s.locks[i].RLock()
for _, series := range it.s.series[i] {
series.Lock()
j := int(series.lset.Hash()) & (it.s.size - 1)
if i != j {
it.s.locks[j].RLock()
}
ret <- series
if i != j {
it.s.locks[j].RUnlock()
}
series.Unlock()
}
it.s.locks[i].RUnlock()
}
close(ret)
}()
return ret
}