Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(canary): Add test to check query results with and without cache. #13104

Merged
merged 8 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clients/cmd/promtail/promtail-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ scrape_configs:
labels:
job: varlogs
__path__: /var/log/*log
stream: stdout
6 changes: 5 additions & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func main() {
metricTestQueryRange := flag.Duration("metric-test-range", 24*time.Hour, "The range value [24h] used in the metric test instant-query."+
" Note: this value is truncated to the running time of the canary until this value is reached")

cacheTestInterval := flag.Duration("cache-test-interval", 15*time.Minute, "The interval the cache test query should be run")
cacheTestQueryRange := flag.Duration("cache-test-range", 24*time.Hour, "The range value [24h] used in the cache test instant-query.")
cacheTestQueryNow := flag.Duration("cache-test-now", 1*time.Hour, "duration how far back from current time the execution time (--now) should be set for running this query in the cache test instant-query.")

spotCheckInterval := flag.Duration("spot-check-interval", 15*time.Minute, "Interval that a single result will be kept from sent entries and spot-checked against Loki, "+
"e.g. 15min default one entry every 15 min will be saved and then queried again every 15min until spot-check-max is reached")
spotCheckMax := flag.Duration("spot-check-max", 4*time.Hour, "How far back to check a spot check entry before dropping it")
Expand Down Expand Up @@ -189,7 +193,7 @@ func main() {
_, _ = fmt.Fprintf(os.Stderr, "Unable to create reader for Loki querier, check config: %s", err)
os.Exit(1)
}
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *cacheTestInterval, *cacheTestQueryRange, *cacheTestQueryNow, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

startCanary()
Expand Down
99 changes: 90 additions & 9 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package comparator
import (
"fmt"
"io"
"math"
"math/rand"
"sync"
"time"
Expand All @@ -24,6 +25,8 @@ const (
DebugWebsocketMissingEntry = "websocket missing entry: %v\n"
DebugQueryResult = "confirmation query result: %v\n"
DebugEntryFound = "missing websocket entry %v was found %v seconds after it was originally sent\n"

floatDiffTolerance = 1e-6
)

var (
Expand Down Expand Up @@ -90,6 +93,16 @@ var (
Help: "how long the spot check test query execution took in seconds.",
Buckets: instrument.DefBuckets,
})
queryResultsDiff = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "cache_test_query_results_diff_total",
Help: "counts number of times the query results was different with and without cache ",
})
queryResultsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "cache_test_query_results_total",
Help: "counts number of times the query results test requests are done ",
}, []string{"status"}) // status=success/failure
)

type Comparator struct {
Expand All @@ -98,6 +111,7 @@ type Comparator struct {
spotEntMtx sync.Mutex // Locks access to []spotCheck
spotMtx sync.Mutex // Locks spotcheckRunning for single threaded but async spotCheck()
metTestMtx sync.Mutex // Locks metricTestRunning for single threaded but async metricTest()
cacheTestMtx sync.Mutex // Locks cacheTestRunning for single threaded but async cacheTest()
pruneMtx sync.Mutex // Locks pruneEntriesRunning for single threaded but async pruneEntries()
w io.Writer
entries []*time.Time
Expand All @@ -116,14 +130,19 @@ type Comparator struct {
metricTestInterval time.Duration
metricTestRange time.Duration
metricTestRunning bool
writeInterval time.Duration
confirmAsync bool
startTime time.Time
sent chan time.Time
recv chan time.Time
rdr reader.LokiReader
quit chan struct{}
done chan struct{}
cacheTestInterval time.Duration
cacheTestRange time.Duration
// how far back from current time the execution time (--now) should be set for running this query.
cacheTestNow time.Duration
cacheTestRunning bool
writeInterval time.Duration
confirmAsync bool
startTime time.Time
sent chan time.Time
recv chan time.Time
rdr reader.LokiReader
quit chan struct{}
done chan struct{}
}

func NewComparator(writer io.Writer,
Expand All @@ -133,6 +152,9 @@ func NewComparator(writer io.Writer,
spotCheckInterval, spotCheckMax, spotCheckQueryRate, spotCheckWait time.Duration,
metricTestInterval time.Duration,
metricTestRange time.Duration,
cacheTestInterval time.Duration,
cacheTestRange time.Duration,
cacheTestNow time.Duration,
writeInterval time.Duration,
buckets int,
sentChan chan time.Time,
Expand All @@ -155,6 +177,10 @@ func NewComparator(writer io.Writer,
metricTestInterval: metricTestInterval,
metricTestRange: metricTestRange,
metricTestRunning: false,
cacheTestInterval: cacheTestInterval,
cacheTestRange: cacheTestRange,
cacheTestNow: cacheTestNow,
cacheTestRunning: false,
writeInterval: writeInterval,
confirmAsync: confirmAsync,
startTime: time.Now(),
Expand Down Expand Up @@ -252,10 +278,12 @@ func (c *Comparator) run() {
randomGenerator := rand.New(rand.NewSource(time.Now().UnixNano()))
mt := time.NewTicker(time.Duration(randomGenerator.Int63n(c.metricTestInterval.Nanoseconds())))
sc := time.NewTicker(c.spotCheckQueryRate)
ct := time.NewTicker(c.cacheTestInterval)
defer func() {
t.Stop()
mt.Stop()
sc.Stop()
ct.Stop()
close(c.done)
}()

Expand Down Expand Up @@ -294,12 +322,65 @@ func (c *Comparator) run() {
firstMt = false
mt.Reset(c.metricTestInterval)
}
case <-ct.C:
// Only run one instance of cache tests at a time.
c.cacheTestMtx.Lock()
if !c.cacheTestRunning {
c.cacheTestRunning = true
go c.cacheTest(time.Now())
}
c.cacheTestMtx.Unlock()

case <-c.quit:
return
}
}
}

func (c *Comparator) cacheTest(currTime time.Time) {
defer func() {
c.cacheTestMtx.Lock()
c.cacheTestRunning = false
c.cacheTestMtx.Unlock()
}()

// cacheTest is currently run using `reader.CountOverTime()` which is an instant query.
// We make the query with and without cache over the data that is not changing (e.g: --now="1hr ago") instead of on latest data that is a moving target.
queryStartTime := currTime.Add(-c.cacheTestNow)

// We cannot query for range before the pod even started.
if queryStartTime.Before(c.startTime) {
// we wait.
fmt.Fprintf(c.w, "cacheTest not run. still waiting for query start range(%s) to past the process start time(%s).\n", queryStartTime, c.startTime)
return
}

rangeDuration := c.cacheTestRange
rng := fmt.Sprintf("%.0fs", rangeDuration.Seconds())

// with cache
countCache, err := c.rdr.QueryCountOverTime(rng, queryStartTime, true)
if err != nil {
fmt.Fprintf(c.w, "error running cache query test with cache: %s\n", err.Error())
queryResultsTotal.WithLabelValues("failure").Inc()
return
}

// without cache
countNocache, err := c.rdr.QueryCountOverTime(rng, queryStartTime, false)
if err != nil {
fmt.Fprintf(c.w, "error running cache query test without cache: %s\n", err.Error())
queryResultsTotal.WithLabelValues("failure").Inc()
return
}

queryResultsTotal.WithLabelValues("success").Inc()
if math.Abs(countNocache-countCache) > floatDiffTolerance {
queryResultsDiff.Inc()
fmt.Fprintf(c.w, "found a diff in instant query results time: %s, result_with_cache: %v, result_without_cache: %v\n", queryStartTime, countCache, countNocache)
}
}

// check that the expected # of log lines have been written to Loki
func (c *Comparator) metricTest(currTime time.Time) {
// Always make sure to set the running state back to false
Expand All @@ -317,7 +398,7 @@ func (c *Comparator) metricTest(currTime time.Time) {
adjustedRange = currTime.Sub(c.startTime)
}
begin := time.Now()
actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds()))
actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds()), begin, true)
metricTestLatency.Observe(time.Since(begin).Seconds())
if err != nil {
fmt.Fprintf(c.w, "error running metric query test: %s\n", err.Error())
Expand Down
99 changes: 89 additions & 10 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestComparatorEntryReceivedDuplicate(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Unix(0, 0)
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestEntryNeverReceived(t *testing.T) {
wait := 60 * time.Second
maxWait := 300 * time.Second
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

c.entrySent(t1)
c.entrySent(t2)
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestConcurrentConfirmMissing(t *testing.T) {
wait := 30 * time.Millisecond
maxWait := 30 * time.Millisecond

c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

for _, t := range found {
tCopy := t
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestPruneAckdEntires(t *testing.T) {
wait := 30 * time.Millisecond
maxWait := 30 * time.Millisecond
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Unix(0, 0)
t2 := t1.Add(1 * time.Millisecond)
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestSpotCheck(t *testing.T) {
spotCheck := 10 * time.Millisecond
spotCheckMax := 20 * time.Millisecond
//We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 3*time.Millisecond, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 3*time.Millisecond, 1*time.Minute, 0, 1*time.Hour, 3*time.Hour, 30*time.Minute, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

// Send all the entries
for i := range entries {
Expand Down Expand Up @@ -360,6 +360,42 @@ func TestSpotCheck(t *testing.T) {
prometheus.Unregister(responseLatency)
}

func TestCacheTest(t *testing.T) {
actual := &bytes.Buffer{}
mr := &mockReader{}
now := time.Now()
cacheTestInterval := 500 * time.Millisecond
cacheTestRange := 30 * time.Second
cacheTestNow := 2 * time.Second

c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, 0, cacheTestInterval, cacheTestRange, cacheTestNow, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), mr, false)
// Force the start time to a known value
c.startTime = time.Unix(10, 0)

queryResultsDiff = &mockCounter{}
mr.countOverTime = 2.3
mr.noCacheCountOvertime = mr.countOverTime // same value for both with and without cache
c.cacheTest(now)
assert.Equal(t, 0, queryResultsDiff.(*mockCounter).count)

queryResultsDiff = &mockCounter{} // reset counter
mr.countOverTime = 2.3 // value not important
mr.noCacheCountOvertime = 2.5 // different than `countOverTime` value.
c.cacheTest(now)
assert.Equal(t, 1, queryResultsDiff.(*mockCounter).count)

queryResultsDiff = &mockCounter{} // reset counter
mr.countOverTime = 2.3 // value not important
mr.noCacheCountOvertime = 2.30000005 // different than `countOverTime` value but withing tolerance
c.cacheTest(now)
assert.Equal(t, 0, queryResultsDiff.(*mockCounter).count)

// This avoids a panic on subsequent test execution,
// seems ugly but was easy, and multiple instantiations
// of the comparator should be an error
prometheus.Unregister(responseLatency)
}

func TestMetricTest(t *testing.T) {
metricTestActual = &mockGauge{}
metricTestExpected = &mockGauge{}
Expand All @@ -371,7 +407,7 @@ func TestMetricTest(t *testing.T) {
mr := &mockReader{}
metricTestRange := 30 * time.Second
//We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, metricTestRange, 1*time.Hour, 3*time.Hour, 30*time.Minute, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false)
// Force the start time to a known value
c.startTime = time.Unix(10, 0)

Expand Down Expand Up @@ -456,6 +492,42 @@ func (m *mockCounter) Inc() {
m.count++
}

type mockCounterVec struct {
mockCounter
labels []string
}

func (m *mockCounterVec) WithLabelValues(lvs ...string) prometheus.Counter {
m.labels = lvs
return &m.mockCounter
}

func (m *mockCounterVec) Desc() *prometheus.Desc {
panic("implement me")
}

func (m *mockCounterVec) Write(*io_prometheus_client.Metric) error {
panic("implement me")
}

func (m *mockCounterVec) Describe(chan<- *prometheus.Desc) {
panic("implement me")
}

func (m *mockCounterVec) Collect(chan<- prometheus.Metric) {
panic("implement me")
}

func (m *mockCounterVec) Add(float64) {
panic("implement me")
}

func (m *mockCounterVec) Inc() {
m.cLck.Lock()
defer m.cLck.Unlock()
m.count++
}

type mockGauge struct {
cLck sync.Mutex
val float64
Expand Down Expand Up @@ -507,13 +579,20 @@ type mockReader struct {
resp []time.Time
countOverTime float64
queryRange string

// return this value if called without cache.
noCacheCountOvertime float64
}

func (r *mockReader) Query(_ time.Time, _ time.Time) ([]time.Time, error) {
return r.resp, nil
}

func (r *mockReader) QueryCountOverTime(queryRange string) (float64, error) {
func (r *mockReader) QueryCountOverTime(queryRange string, _ time.Time, cache bool) (float64, error) {
r.queryRange = queryRange
return r.countOverTime, nil
res := r.countOverTime
if !cache {
res = r.noCacheCountOvertime
}
return res, nil
}
Loading
Loading