Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return empty series if metrics exist in index and has no points #240

Merged
merged 10 commits into from
Jul 27, 2023
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ else
VERSION:=$(shell sh -c 'git describe --always --tags | sed -e "s/^v//i"')
endif

OS ?= linux

SRCS:=$(shell find . -name '*.go')

all: $(NAME)
Expand Down Expand Up @@ -58,10 +60,10 @@ client: $(NAME)
gox-build:
rm -rf out
mkdir -p out
gox -ldflags '-X main.BuildVersion=$(VERSION)' -os="linux" -arch="amd64" -arch="arm64" -output="out/$(NAME)-{{.OS}}-{{.Arch}}" github.com/lomik/$(NAME)
gox -ldflags '-X main.BuildVersion=$(VERSION)' -os="$(OS)" -arch="amd64" -arch="arm64" -output="out/$(NAME)-{{.OS}}-{{.Arch}}" github.com/lomik/$(NAME)
ls -la out/
mkdir -p out/root/etc/$(NAME)/
./out/$(NAME)-linux-amd64 -config-print-default > out/root/etc/$(NAME)/$(NAME).conf
./out/$(NAME)-$(OS)-amd64 -config-print-default > out/root/etc/$(NAME)/$(NAME).conf

fpm-deb:
$(MAKE) fpm-build-deb ARCH=amd64
Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
toml "github.com/pelletier/go-toml"
"go.uber.org/zap"

"github.com/lomik/zapwriter"

"github.com/lomik/graphite-clickhouse/cache"
"github.com/lomik/graphite-clickhouse/helper/date"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/limiter"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/zapwriter"
)

// Cache config
Expand All @@ -47,12 +48,12 @@ type Common struct {
MaxCPU int `toml:"max-cpu" json:"max-cpu"`
MaxMetricsInFindAnswer int `toml:"max-metrics-in-find-answer" json:"max-metrics-in-find-answer" comment:"limit number of results from find query, 0=unlimited"`
MaxMetricsPerTarget int `toml:"max-metrics-per-target" json:"max-metrics-per-target" comment:"limit numbers of queried metrics per target in /render requests, 0 or negative = unlimited"`
IncludeEmptyMetrics bool `toml:"include-empty-metrics" json:"include-empty-metrics" comment:"if true, always return points for all metrics, replacing empty results with list of NaN"`
TargetBlacklist []string `toml:"target-blacklist" json:"target-blacklist" comment:"daemon returns empty response if query matches any of regular expressions" commented:"true"`
Blacklist []*regexp.Regexp `toml:"-" json:"-"` // compiled TargetBlacklist
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`
FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

FindCache cache.BytesCache `toml:"-" json:"-"`
}
Expand Down
2 changes: 2 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
max-metrics-in-find-answer = 0
# limit numbers of queried metrics per target in /render requests, 0 or negative = unlimited
max-metrics-per-target = 15000
# if true, always return points for all metrics, replacing empty results with list of NaN
include-empty-metrics = false
# daemon returns empty response if query matches any of regular expressions
# target-blacklist = []
# daemon will return the freed memory to the OS when it>0
Expand Down
62 changes: 56 additions & 6 deletions helper/client/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -120,14 +121,25 @@ func Render(client *http.Client, address string, format FormatType, targets []st
return queryParams, nil, resp.Header, NewHttpError(resp.StatusCode, string(b))
}

var metrics []Metric
metrics, err := Decode(b, format)
if err != nil {
return queryParams, nil, resp.Header, err
}
return queryParams, metrics, resp.Header, nil
}

// Decode converts data in the give format to a Metric
func Decode(b []byte, format FormatType) ([]Metric, error) {
var (
metrics []Metric
err error
)
switch format {
case FormatPb_v3:
var r protov3.MultiFetchResponse
err = r.Unmarshal(b)
if err != nil {
return queryParams, nil, resp.Header, err
return nil, err
}
metrics = make([]Metric, 0, len(r.Metrics))
for _, m := range r.Metrics {
Expand All @@ -150,7 +162,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st
var r protov2.MultiFetchResponse
err = r.Unmarshal(b)
if err != nil {
return queryParams, nil, resp.Header, err
return nil, err
}
metrics = make([]Metric, 0, len(r.Metrics))
for _, m := range r.Metrics {
Expand All @@ -173,7 +185,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st
decoder := pickle.NewDecoder(reader)
p, err := decoder.Decode()
if err != nil {
return queryParams, nil, resp.Header, err
return nil, err
}
for _, v := range p.([]interface{}) {
m := v.(map[interface{}]interface{})
Expand All @@ -195,9 +207,47 @@ func Render(client *http.Client, address string, format FormatType, targets []st
Values: values,
})
}
case FormatJSON:
var r jsonResponse
err = json.Unmarshal(b, &r)
if err != nil {
return nil, err
}
metrics = make([]Metric, 0, len(r.Metrics))
for _, m := range r.Metrics {
values := make([]float64, len(m.Values))
for i, v := range m.Values {
if v == nil {
values[i] = math.NaN()
} else {
values[i] = *v
}
}
metrics = append(metrics, Metric{
Name: m.Name,
PathExpression: m.PathExpression,
StartTime: m.StartTime,
StopTime: m.StopTime,
StepTime: m.StepTime,
Values: values,
})
}
default:
return queryParams, nil, resp.Header, ErrUnsupportedFormat
return nil, ErrUnsupportedFormat
}
return metrics, nil
}

return queryParams, metrics, resp.Header, nil
// jsonResponse is a simple struct to decode JSON responses for testing purposes
type jsonResponse struct {
Metrics []jsonMetric `json:"metrics"`
}

type jsonMetric struct {
Name string `json:"name"`
PathExpression string `json:"pathExpression"`
Values []*float64 `json:"values"`
StartTime int64 `json:"startTime"`
StopTime int64 `json:"stopTime"`
StepTime int64 `json:"stepTime"`
}
5 changes: 4 additions & 1 deletion helper/point/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func FillNulls(points []Point, from, until, step uint32) (start, stop, count uin
count = (stop - start) / step
last := start - step
currentPoint := 0
metricID := points[0].MetricID
var metricID uint32
if len(points) > 0 {
metricID = points[0].MetricID
}
getter = func() (float64, error) {
if stop <= last {
return 0, ErrTimeGreaterStop
Expand Down
65 changes: 45 additions & 20 deletions render/data/ch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

v2pb "github.com/go-graphite/protocol/carbonapi_v2_pb"
v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb"

"github.com/lomik/graphite-clickhouse/helper/point"
)

Expand All @@ -14,13 +15,15 @@ type CHResponse struct {
Data *Data
From int64
Until int64
// if true, return points for all metrics, replacing empty results with list of NaN
AppendOutEmptySeries bool
}

// CHResponses is a slice of CHResponse
type CHResponses []CHResponse

// EmptyResponse returns an CHResponses with one element containing emptyData for the following encoding
func EmptyResponse() CHResponses { return CHResponses{{emptyData, 0, 0}} }
func EmptyResponse() CHResponses { return CHResponses{{Data: emptyData}} }

// ToMultiFetchResponseV2 returns protobuf v2pb.MultiFetchResponse message for given CHResponse
func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error) {
Expand Down Expand Up @@ -90,23 +93,8 @@ func (cc *CHResponses) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error
func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToMultiFetchResponseV2 also need changes for Null Points like ToMultiFetchResponseV3

mfr := &v3pb.MultiFetchResponse{Metrics: make([]v3pb.FetchResponse, 0)}
data := c.Data
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
consolidationFunc, err := data.GetAggregation(id)
if err != nil {
return nil, err
}
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
start, stop, count, getValue := point.FillNulls(points, uint32(c.From), uint32(c.Until), step)
addResponse := func(name, function string, from, until, step uint32, points []point.Point) error {
start, stop, count, getValue := point.FillNulls(points, from, until, step)
values := make([]float64, 0, count)
for {
value, err := getValue()
Expand All @@ -115,15 +103,15 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error)
break
}
// if err is not point.ErrTimeGreaterStop, the points are corrupted
return nil, err
return err
}
values = append(values, value)
}
for _, a := range data.AM.Get(name) {
fr := v3pb.FetchResponse{
Name: a.DisplayName,
PathExpression: a.Target,
ConsolidationFunc: consolidationFunc,
ConsolidationFunc: function,
StartTime: int64(start),
StopTime: int64(stop),
StepTime: int64(step),
Expand All @@ -135,6 +123,43 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error)
}
mfr.Metrics = append(mfr.Metrics, fr)
}
return nil
}

// process metrics with points
writtenMetrics := make(map[string]struct{})
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
writtenMetrics[name] = struct{}{}
consolidationFunc, err := data.GetAggregation(id)
if err != nil {
return nil, err
}
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
if err := addResponse(name, consolidationFunc, uint32(c.From), uint32(c.Until), step, points); err != nil {
return nil, err
}
}
// process metrics with no points
if c.AppendOutEmptySeries && len(writtenMetrics) < data.AM.Len() && data.CommonStep > 0 {
for _, metricName := range data.AM.Series(false) {
if _, done := writtenMetrics[metricName]; done {
msaf1980 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
err := addResponse(metricName, "any", uint32(c.From), uint32(c.Until), uint32(data.CommonStep), []point.Point{})
if err != nil {
return nil, err
}
}
}
return mfr, nil
}
Expand Down
10 changes: 5 additions & 5 deletions render/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ var ReadUvarint = clickhouse.ReadUvarint
type Data struct {
*point.Points
AM *alias.Map
commonStep int64
CommonStep int64
}

var emptyData *Data = &Data{Points: point.NewPoints()}
var emptyData *Data = &Data{Points: point.NewPoints(), AM: alias.New()}

func contextIsValid(ctx context.Context) error {
select {
Expand All @@ -42,8 +42,8 @@ func contextIsValid(ctx context.Context) error {

// GetStep returns the commonStep for all points or, if unset, step for metric ID id
func (d *Data) GetStep(id uint32) (uint32, error) {
if 0 < d.commonStep {
return uint32(d.commonStep), nil
if 0 < d.CommonStep {
return uint32(d.CommonStep), nil
}
return d.Points.GetStep(id)
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func prepareData(ctx context.Context, targets int, fetcher func() *point.Points)
// setSteps sets commonStep for aggregated requests and per-metric step for non-aggregated
func (d *data) setSteps(cond *conditions) {
if cond.aggregated {
d.commonStep = cond.step
d.CommonStep = cond.step
return
}
d.Points.SetSteps(cond.steps)
Expand Down
9 changes: 7 additions & 2 deletions render/data/multi_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"time"

v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
"go.uber.org/zap"

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/helper/errs"
"github.com/lomik/graphite-clickhouse/limiter"
"github.com/lomik/graphite-clickhouse/pkg/alias"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"go.uber.org/zap"
)

// TimeFrame contains information about fetch request time conditions
Expand Down Expand Up @@ -165,7 +166,11 @@ func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext s

for tf, targets := range *m {
tf, targets := tf, targets
cond := &conditions{TimeFrame: &tf, Targets: targets, aggregated: cfg.ClickHouse.InternalAggregation}
cond := &conditions{TimeFrame: &tf,
Targets: targets,
aggregated: cfg.ClickHouse.InternalAggregation,
appendEmptySeries: cfg.Common.IncludeEmptyMetrics,
}
if cond.MaxDataPoints <= 0 || int64(cfg.ClickHouse.MaxDataPoints) < cond.MaxDataPoints {
cond.MaxDataPoints = int64(cfg.ClickHouse.MaxDataPoints)
}
Expand Down
14 changes: 9 additions & 5 deletions render/data/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (
"sync/atomic"
"time"

"go.uber.org/zap"

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/dry"
"github.com/lomik/graphite-clickhouse/pkg/reverse"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"github.com/lomik/graphite-clickhouse/pkg/where"
"go.uber.org/zap"
)

// from, until, step, function, table, prewhere, where
Expand Down Expand Up @@ -77,6 +78,8 @@ type conditions struct {
prewhere string
// where contains WHERE condition
where string
// show list of NaN values instead of empty results
appendEmptySeries bool
// metricUnreversed grouped by aggregating function
aggregations map[string][]string
// External-data bodies grouped by aggregatig function. For non-aggregated requests "" used as a key
Expand Down Expand Up @@ -213,7 +216,7 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error {

data.Points.Uniq()
rollupStart := time.Now()
err = cond.rollupRules.RollupPoints(data.Points, cond.From, data.commonStep)
err = cond.rollupRules.RollupPoints(data.Points, cond.From, data.CommonStep)
if err != nil {
logger.Error("rollup failed", zap.Error(err))
return err
Expand All @@ -229,9 +232,10 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error {
data.AM = cond.AM

q.appendReply(CHResponse{
Data: data.Data,
From: cond.From,
Until: cond.Until,
Data: data.Data,
From: cond.From,
Until: cond.Until,
AppendOutEmptySeries: cond.appendEmptySeries,
})
return nil
}
Expand Down
Loading
Loading