From dd45bed5e7468f81ffccdf8e551fbf51f77d3a40 Mon Sep 17 00:00:00 2001 From: Owamoyo Evans Date: Thu, 27 Jul 2023 10:55:20 +0300 Subject: [PATCH] return empty series if metrics exist in index and has no points (#240) --- Makefile | 6 +- cmd/e2e-test/main.go | 2 +- config/config.go | 7 +- doc/config.md | 2 + helper/client/render.go | 62 ++++- helper/point/func.go | 5 +- render/data/ch_response.go | 122 ++++++--- render/data/data.go | 10 +- render/data/multi_target.go | 9 +- render/data/query.go | 14 +- render/handler.go | 3 +- render/reply/formatter_test.go | 240 ++++++++++++++++++ render/reply/json.go | 7 +- render/reply/pickle.go | 29 ++- render/reply/protobuf.go | 21 +- render/reply/v3_pb_test.go | 13 +- .../carbon-clickhouse.conf.tpl | 45 ++++ .../graphite-clickhouse.conf.tpl | 34 +++ tests/emptyseries_append/test.toml | 115 +++++++++ .../carbon-clickhouse.conf.tpl | 45 ++++ .../graphite-clickhouse.conf.tpl | 34 +++ tests/emptyseries_noappend/test.toml | 104 ++++++++ 22 files changed, 849 insertions(+), 80 deletions(-) create mode 100644 render/reply/formatter_test.go create mode 100644 tests/emptyseries_append/carbon-clickhouse.conf.tpl create mode 100644 tests/emptyseries_append/graphite-clickhouse.conf.tpl create mode 100644 tests/emptyseries_append/test.toml create mode 100644 tests/emptyseries_noappend/carbon-clickhouse.conf.tpl create mode 100644 tests/emptyseries_noappend/graphite-clickhouse.conf.tpl create mode 100644 tests/emptyseries_noappend/test.toml diff --git a/Makefile b/Makefile index c89394bca..3f4d296fb 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,8 @@ else VERSION:=$(shell sh -c 'git describe --always --tags | sed -e "s/^v//i"') endif +OS ?= linux + SRCS:=$(shell find . -name '*.go') all: $(NAME) @@ -58,10 +60,10 @@ client: $(NAME) gox-build: rm -rf out mkdir -p out - gox -ldflags '-X main.BuildVersion=$(VERSION)' -os="linux" -arch="amd64" -arch="arm64" -output="out/$(NAME)-{{.OS}}-{{.Arch}}" github.com/lomik/$(NAME) + gox -ldflags '-X main.BuildVersion=$(VERSION)' -os="$(OS)" -arch="amd64" -arch="arm64" -output="out/$(NAME)-{{.OS}}-{{.Arch}}" github.com/lomik/$(NAME) ls -la out/ mkdir -p out/root/etc/$(NAME)/ - ./out/$(NAME)-linux-amd64 -config-print-default > out/root/etc/$(NAME)/$(NAME).conf + ./out/$(NAME)-$(OS)-amd64 -config-print-default > out/root/etc/$(NAME)/$(NAME).conf fpm-deb: $(MAKE) fpm-build-deb ARCH=amd64 diff --git a/cmd/e2e-test/main.go b/cmd/e2e-test/main.go index 1e03b35b5..d804dcb98 100644 --- a/cmd/e2e-test/main.go +++ b/cmd/e2e-test/main.go @@ -145,7 +145,7 @@ func main() { verifyCount := 0 verifyFailed := 0 - _, err = cmdExec(DockerBinary, "network", "exists", DockerNetwork) + _, err = cmdExec(DockerBinary, "network", "inspect", DockerNetwork) if err != nil { out, err := cmdExec(DockerBinary, "network", "create", DockerNetwork) if err != nil { diff --git a/config/config.go b/config/config.go index 1fbf3ee70..5b6cb1e24 100644 --- a/config/config.go +++ b/config/config.go @@ -18,12 +18,13 @@ import ( toml "github.com/pelletier/go-toml" "go.uber.org/zap" + "github.com/lomik/zapwriter" + "github.com/lomik/graphite-clickhouse/cache" "github.com/lomik/graphite-clickhouse/helper/date" "github.com/lomik/graphite-clickhouse/helper/rollup" "github.com/lomik/graphite-clickhouse/limiter" "github.com/lomik/graphite-clickhouse/metrics" - "github.com/lomik/zapwriter" ) // Cache config @@ -47,12 +48,12 @@ type Common struct { MaxCPU int `toml:"max-cpu" json:"max-cpu"` MaxMetricsInFindAnswer int `toml:"max-metrics-in-find-answer" json:"max-metrics-in-find-answer" comment:"limit number of results from find query, 0=unlimited"` MaxMetricsPerTarget int `toml:"max-metrics-per-target" json:"max-metrics-per-target" comment:"limit numbers of queried metrics per target in /render requests, 0 or negative = unlimited"` + AppendEmptySeries bool `toml:"append-empty-series" json:"append-empty-series" comment:"if true, always return points for all metrics, replacing empty results with list of NaN"` TargetBlacklist []string `toml:"target-blacklist" json:"target-blacklist" comment:"daemon returns empty response if query matches any of regular expressions" commented:"true"` Blacklist []*regexp.Regexp `toml:"-" json:"-"` // compiled TargetBlacklist MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"` HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"` - - FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"` + FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"` FindCache cache.BytesCache `toml:"-" json:"-"` } diff --git a/doc/config.md b/doc/config.md index 030bbad78..4ccb8fdda 100644 --- a/doc/config.md +++ b/doc/config.md @@ -182,6 +182,8 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str max-metrics-in-find-answer = 0 # limit numbers of queried metrics per target in /render requests, 0 or negative = unlimited max-metrics-per-target = 15000 + # if true, always return points for all metrics, replacing empty results with list of NaN + append-empty-series = false # daemon returns empty response if query matches any of regular expressions # target-blacklist = [] # daemon will return the freed memory to the OS when it>0 diff --git a/helper/client/render.go b/helper/client/render.go index 168c9fb00..554254580 100644 --- a/helper/client/render.go +++ b/helper/client/render.go @@ -2,6 +2,7 @@ package client import ( "bytes" + "encoding/json" "errors" "fmt" "io" @@ -120,14 +121,25 @@ func Render(client *http.Client, address string, format FormatType, targets []st return queryParams, nil, resp.Header, NewHttpError(resp.StatusCode, string(b)) } - var metrics []Metric + metrics, err := Decode(b, format) + if err != nil { + return queryParams, nil, resp.Header, err + } + return queryParams, metrics, resp.Header, nil +} +// Decode converts data in the give format to a Metric +func Decode(b []byte, format FormatType) ([]Metric, error) { + var ( + metrics []Metric + err error + ) switch format { case FormatPb_v3: var r protov3.MultiFetchResponse err = r.Unmarshal(b) if err != nil { - return queryParams, nil, resp.Header, err + return nil, err } metrics = make([]Metric, 0, len(r.Metrics)) for _, m := range r.Metrics { @@ -150,7 +162,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st var r protov2.MultiFetchResponse err = r.Unmarshal(b) if err != nil { - return queryParams, nil, resp.Header, err + return nil, err } metrics = make([]Metric, 0, len(r.Metrics)) for _, m := range r.Metrics { @@ -173,7 +185,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st decoder := pickle.NewDecoder(reader) p, err := decoder.Decode() if err != nil { - return queryParams, nil, resp.Header, err + return nil, err } for _, v := range p.([]interface{}) { m := v.(map[interface{}]interface{}) @@ -195,9 +207,47 @@ func Render(client *http.Client, address string, format FormatType, targets []st Values: values, }) } + case FormatJSON: + var r jsonResponse + err = json.Unmarshal(b, &r) + if err != nil { + return nil, err + } + metrics = make([]Metric, 0, len(r.Metrics)) + for _, m := range r.Metrics { + values := make([]float64, len(m.Values)) + for i, v := range m.Values { + if v == nil { + values[i] = math.NaN() + } else { + values[i] = *v + } + } + metrics = append(metrics, Metric{ + Name: m.Name, + PathExpression: m.PathExpression, + StartTime: m.StartTime, + StopTime: m.StopTime, + StepTime: m.StepTime, + Values: values, + }) + } default: - return queryParams, nil, resp.Header, ErrUnsupportedFormat + return nil, ErrUnsupportedFormat } + return metrics, nil +} - return queryParams, metrics, resp.Header, nil +// jsonResponse is a simple struct to decode JSON responses for testing purposes +type jsonResponse struct { + Metrics []jsonMetric `json:"metrics"` +} + +type jsonMetric struct { + Name string `json:"name"` + PathExpression string `json:"pathExpression"` + Values []*float64 `json:"values"` + StartTime int64 `json:"startTime"` + StopTime int64 `json:"stopTime"` + StepTime int64 `json:"stepTime"` } diff --git a/helper/point/func.go b/helper/point/func.go index 04617d73c..2ad3b25a6 100644 --- a/helper/point/func.go +++ b/helper/point/func.go @@ -59,7 +59,10 @@ func FillNulls(points []Point, from, until, step uint32) (start, stop, count uin count = (stop - start) / step last := start - step currentPoint := 0 - metricID := points[0].MetricID + var metricID uint32 + if len(points) > 0 { + metricID = points[0].MetricID + } getter = func() (float64, error) { if stop <= last { return 0, ErrTimeGreaterStop diff --git a/render/data/ch_response.go b/render/data/ch_response.go index 3b9061f38..96cd3a8f9 100644 --- a/render/data/ch_response.go +++ b/render/data/ch_response.go @@ -6,6 +6,7 @@ import ( v2pb "github.com/go-graphite/protocol/carbonapi_v2_pb" v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb" + "github.com/lomik/graphite-clickhouse/helper/point" ) @@ -14,31 +15,24 @@ type CHResponse struct { Data *Data From int64 Until int64 + // if true, return points for all metrics, replacing empty results with list of NaN + AppendOutEmptySeries bool } // CHResponses is a slice of CHResponse type CHResponses []CHResponse // EmptyResponse returns an CHResponses with one element containing emptyData for the following encoding -func EmptyResponse() CHResponses { return CHResponses{{emptyData, 0, 0}} } +func EmptyResponse() CHResponses { return CHResponses{{Data: emptyData}} } // ToMultiFetchResponseV2 returns protobuf v2pb.MultiFetchResponse message for given CHResponse func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error) { mfr := &v2pb.MultiFetchResponse{Metrics: make([]v2pb.FetchResponse, 0)} data := c.Data - nextMetric := data.GroupByMetric() - for { - points := nextMetric() - if len(points) == 0 { - break - } - id := points[0].MetricID - name := data.MetricName(id) - step, err := data.GetStep(id) - if err != nil { - return nil, err - } - start, stop, count, getValue := point.FillNulls(points, uint32(c.From), uint32(c.Until), step) + + addResponse := func(name string, step uint32, points []point.Point) error { + from, until := uint32(c.From), uint32(c.Until) + start, stop, count, getValue := point.FillNulls(points, from, until, step) values := make([]float64, 0, count) isAbsent := make([]bool, 0, count) for { @@ -48,15 +42,15 @@ func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error) break } // if err is not point.ErrTimeGreaterStop, the points are corrupted - return nil, err + return err } if math.IsNaN(value) { values = append(values, 0) isAbsent = append(isAbsent, true) - continue + } else { + values = append(values, value) + isAbsent = append(isAbsent, false) } - values = append(values, value) - isAbsent = append(isAbsent, false) } for _, a := range data.AM.Get(name) { fr := v2pb.FetchResponse{ @@ -69,6 +63,38 @@ func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error) } mfr.Metrics = append(mfr.Metrics, fr) } + return nil + } + + // process metrics with points + writtenMetrics := make(map[string]struct{}) + nextMetric := data.GroupByMetric() + for { + points := nextMetric() + if len(points) == 0 { + break + } + id := points[0].MetricID + name := data.MetricName(id) + writtenMetrics[name] = struct{}{} + step, err := data.GetStep(id) + if err != nil { + return nil, err + } + if err := addResponse(name, step, points); err != nil { + return nil, err + } + } + // process metrics with no points + if c.AppendOutEmptySeries && len(writtenMetrics) < data.AM.Len() && data.CommonStep > 0 { + for _, metricName := range data.AM.Series(false) { + if _, done := writtenMetrics[metricName]; !done { + err := addResponse(metricName, uint32(data.CommonStep), []point.Point{}) + if err != nil { + return nil, err + } + } + } } return mfr, nil } @@ -90,23 +116,9 @@ func (cc *CHResponses) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error) { mfr := &v3pb.MultiFetchResponse{Metrics: make([]v3pb.FetchResponse, 0)} data := c.Data - nextMetric := data.GroupByMetric() - for { - points := nextMetric() - if len(points) == 0 { - break - } - id := points[0].MetricID - name := data.MetricName(id) - consolidationFunc, err := data.GetAggregation(id) - if err != nil { - return nil, err - } - step, err := data.GetStep(id) - if err != nil { - return nil, err - } - start, stop, count, getValue := point.FillNulls(points, uint32(c.From), uint32(c.Until), step) + addResponse := func(name, function string, step uint32, points []point.Point) error { + from, until := uint32(c.From), uint32(c.Until) + start, stop, count, getValue := point.FillNulls(points, from, until, step) values := make([]float64, 0, count) for { value, err := getValue() @@ -115,7 +127,7 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error) break } // if err is not point.ErrTimeGreaterStop, the points are corrupted - return nil, err + return err } values = append(values, value) } @@ -123,7 +135,7 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error) fr := v3pb.FetchResponse{ Name: a.DisplayName, PathExpression: a.Target, - ConsolidationFunc: consolidationFunc, + ConsolidationFunc: function, StartTime: int64(start), StopTime: int64(stop), StepTime: int64(step), @@ -135,6 +147,42 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error) } mfr.Metrics = append(mfr.Metrics, fr) } + return nil + } + + // process metrics with points + writtenMetrics := make(map[string]struct{}) + nextMetric := data.GroupByMetric() + for { + points := nextMetric() + if len(points) == 0 { + break + } + id := points[0].MetricID + name := data.MetricName(id) + writtenMetrics[name] = struct{}{} + consolidationFunc, err := data.GetAggregation(id) + if err != nil { + return nil, err + } + step, err := data.GetStep(id) + if err != nil { + return nil, err + } + if err := addResponse(name, consolidationFunc, step, points); err != nil { + return nil, err + } + } + // process metrics with no points + if c.AppendOutEmptySeries && len(writtenMetrics) < data.AM.Len() && data.CommonStep > 0 { + for _, metricName := range data.AM.Series(false) { + if _, done := writtenMetrics[metricName]; !done { + err := addResponse(metricName, "any", uint32(data.CommonStep), []point.Point{}) + if err != nil { + return nil, err + } + } + } } return mfr, nil } diff --git a/render/data/data.go b/render/data/data.go index 2c7084a66..42786e819 100644 --- a/render/data/data.go +++ b/render/data/data.go @@ -26,10 +26,10 @@ var ReadUvarint = clickhouse.ReadUvarint type Data struct { *point.Points AM *alias.Map - commonStep int64 + CommonStep int64 } -var emptyData *Data = &Data{Points: point.NewPoints()} +var emptyData *Data = &Data{Points: point.NewPoints(), AM: alias.New()} func contextIsValid(ctx context.Context) error { select { @@ -42,8 +42,8 @@ func contextIsValid(ctx context.Context) error { // GetStep returns the commonStep for all points or, if unset, step for metric ID id func (d *Data) GetStep(id uint32) (uint32, error) { - if 0 < d.commonStep { - return uint32(d.commonStep), nil + if 0 < d.CommonStep { + return uint32(d.CommonStep), nil } return d.Points.GetStep(id) } @@ -127,7 +127,7 @@ func prepareData(ctx context.Context, targets int, fetcher func() *point.Points) // setSteps sets commonStep for aggregated requests and per-metric step for non-aggregated func (d *data) setSteps(cond *conditions) { if cond.aggregated { - d.commonStep = cond.step + d.CommonStep = cond.step return } d.Points.SetSteps(cond.steps) diff --git a/render/data/multi_target.go b/render/data/multi_target.go index cff2a418c..e1ae3bd28 100644 --- a/render/data/multi_target.go +++ b/render/data/multi_target.go @@ -8,12 +8,13 @@ import ( "time" v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb" + "go.uber.org/zap" + "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/helper/errs" "github.com/lomik/graphite-clickhouse/limiter" "github.com/lomik/graphite-clickhouse/pkg/alias" "github.com/lomik/graphite-clickhouse/pkg/scope" - "go.uber.org/zap" ) // TimeFrame contains information about fetch request time conditions @@ -165,7 +166,11 @@ func (m *MultiTarget) Fetch(ctx context.Context, cfg *config.Config, chContext s for tf, targets := range *m { tf, targets := tf, targets - cond := &conditions{TimeFrame: &tf, Targets: targets, aggregated: cfg.ClickHouse.InternalAggregation} + cond := &conditions{TimeFrame: &tf, + Targets: targets, + aggregated: cfg.ClickHouse.InternalAggregation, + appendEmptySeries: cfg.Common.AppendEmptySeries, + } if cond.MaxDataPoints <= 0 || int64(cfg.ClickHouse.MaxDataPoints) < cond.MaxDataPoints { cond.MaxDataPoints = int64(cfg.ClickHouse.MaxDataPoints) } diff --git a/render/data/query.go b/render/data/query.go index 90b308d6a..5e69770fe 100644 --- a/render/data/query.go +++ b/render/data/query.go @@ -10,6 +10,8 @@ import ( "sync/atomic" "time" + "go.uber.org/zap" + "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/helper/clickhouse" "github.com/lomik/graphite-clickhouse/metrics" @@ -17,7 +19,6 @@ import ( "github.com/lomik/graphite-clickhouse/pkg/reverse" "github.com/lomik/graphite-clickhouse/pkg/scope" "github.com/lomik/graphite-clickhouse/pkg/where" - "go.uber.org/zap" ) // from, until, step, function, table, prewhere, where @@ -77,6 +78,8 @@ type conditions struct { prewhere string // where contains WHERE condition where string + // show list of NaN values instead of empty results + appendEmptySeries bool // metricUnreversed grouped by aggregating function aggregations map[string][]string // External-data bodies grouped by aggregatig function. For non-aggregated requests "" used as a key @@ -213,7 +216,7 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error { data.Points.Uniq() rollupStart := time.Now() - err = cond.rollupRules.RollupPoints(data.Points, cond.From, data.commonStep) + err = cond.rollupRules.RollupPoints(data.Points, cond.From, data.CommonStep) if err != nil { logger.Error("rollup failed", zap.Error(err)) return err @@ -229,9 +232,10 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error { data.AM = cond.AM q.appendReply(CHResponse{ - Data: data.Data, - From: cond.From, - Until: cond.Until, + Data: data.Data, + From: cond.From, + Until: cond.Until, + AppendOutEmptySeries: cond.appendEmptySeries, }) return nil } diff --git a/render/handler.go b/render/handler.go index c62ff3f7d..a45cca934 100644 --- a/render/handler.go +++ b/render/handler.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "github.com/go-graphite/carbonapi/pkg/parser" + "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/finder" "github.com/lomik/graphite-clickhouse/helper/clickhouse" @@ -330,7 +331,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if len(reply) == 0 { status = http.StatusNotFound - formatter.Reply(w, r, data.EmptyResponse()) + formatter.Reply(w, r, reply) return } diff --git a/render/reply/formatter_test.go b/render/reply/formatter_test.go new file mode 100644 index 000000000..1a0dcaf63 --- /dev/null +++ b/render/reply/formatter_test.go @@ -0,0 +1,240 @@ +package reply + +import ( + "context" + "fmt" + "io" + "math" + "net/http" + "net/http/httptest" + "sort" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/lomik/graphite-clickhouse/finder" + "github.com/lomik/graphite-clickhouse/helper/client" + "github.com/lomik/graphite-clickhouse/helper/point" + "github.com/lomik/graphite-clickhouse/pkg/alias" + "github.com/lomik/graphite-clickhouse/render/data" +) + +var results = []client.Metric{ + { + Name: "test.metric1", + PathExpression: "test.*", + StartTime: 1688990040, + StepTime: 60, + StopTime: 1688990520, + Values: func() []float64 { + temp := emptyValues(8) + temp[2] = 3 + return temp + }(), + }, + { + Name: "test.metric2", + PathExpression: "test.*", + StartTime: 1688990040, + StepTime: 60, + StopTime: 1688990520, + Values: emptyValues(8), + }, + { + Name: "test.metric3", + PathExpression: "test.*", + StartTime: 1688990040, + StepTime: 60, + StopTime: 1688990520, + Values: emptyValues(8), + }, +} + +func TestFormatterReply(t *testing.T) { + formatters := []struct { + impl Formatter + name string + format client.FormatType + }{ + {&V3PB{}, "v3pb", client.FormatPb_v3}, + {&V2PB{}, "v2pb", client.FormatPb_v2}, + {&JSON{}, "json", client.FormatJSON}, + {&Pickle{}, "pickle", client.FormatPickle}, + } + tests := []struct { + name string + input data.CHResponses + // result when CHResponse.AppendOutEmptySeries is false + expectedWithoutEmpty []client.Metric + // result when CHResponse.AppendOutEmptySeries is true + expectedWithEmpty []client.Metric + }{ + { + name: "no index found", + input: data.EmptyResponse(), + expectedWithoutEmpty: []client.Metric{}, + expectedWithEmpty: []client.Metric{}, + }, + { + name: "three metrics; test.metric1 with points and other with NaN", + input: prepareCHResponses(1688990000, 1688990460, + [][]byte{[]byte("test.metric1"), []byte("test.metric2"), []byte("test.metric3")}, + map[string][]point.Point{ + "test.metric1": {{Value: 3, Time: 1688990160, Timestamp: 1688990204}}, + }, + ), + expectedWithoutEmpty: results[:1], + expectedWithEmpty: results, + }, + { + name: "three metrics, no points in all", + input: prepareCHResponses(1688990000, 1688990460, + [][]byte{[]byte("test.metric1"), []byte("test.metric2"), []byte("test.metric3")}, + map[string][]point.Point{}, + ), + expectedWithoutEmpty: []client.Metric{}, + expectedWithEmpty: append([]client.Metric{ + { + Name: results[0].Name, + PathExpression: results[0].PathExpression, + StartTime: results[0].StartTime, + StopTime: results[0].StopTime, + StepTime: results[0].StepTime, + Values: emptyValues(8), + }, + }, results[1:]...), + }, + } + for _, formatter := range formatters { + t.Run(fmt.Sprintf("format=%s", formatter.name), func(t *testing.T) { + for _, tt := range tests { + // case 0: test for AppendOutEmptySeries = false + // case 1: test for AppendOutEmptySeries = true + for i := 0; i < 2; i++ { + var expected []client.Metric + var testName string + switch i { + case 0: + expected = tt.expectedWithoutEmpty + testName = fmt.Sprintf("NoAppend: %s", tt.name) + for j := range tt.input { + tt.input[j].AppendOutEmptySeries = false + } + case 1: + expected = tt.expectedWithEmpty + testName = fmt.Sprintf("WithAppend: %s", tt.name) + for j := range tt.input { + tt.input[j].AppendOutEmptySeries = true + } + } + + t.Run(testName, func(t *testing.T) { + ctx := context.Background() + // if tt.protobufDebug { + // ctx = scope.WithDebug(ctx, "Protobuf") + // } + w := httptest.NewRecorder() + r, err := http.NewRequestWithContext(ctx, "", "", nil) + if err != nil { + require.NoErrorf(t, err, "failed to create request") + } + + formatter.impl.Reply(w, r, tt.input) + response := w.Result() + defer response.Body.Close() + + // then + require.Equal(t, http.StatusOK, response.StatusCode) + data, err := io.ReadAll(response.Body) + require.NoError(t, err) + got, err := client.Decode(data, formatter.format) + require.NoError(t, err) + if !equalMetrics(expected, got) { + t.Errorf("metrics not equal: expected:\n%#v\ngot:\n%#v\n", expected, got) + } + }) + } + } + }) + } +} + +// prepareCHResponses prepares CHResponses for tests. +func prepareCHResponses(from, until int64, indices [][]byte, points map[string][]point.Point) data.CHResponses { + // alias + idx := finder.NewMockFinder(indices) + m := alias.New() + m.MergeTarget(idx, "test.*", false) + + // points + pts := point.NewPoints() + stringIndex := make([]string, 0, len(indices)) + for _, each := range indices { + stringIndex = append(stringIndex, string(each)) + } + for k, v := range points { + id := pts.MetricID(k) + for _, eachPoint := range v { + pts.AppendPoint(id, eachPoint.Value, eachPoint.Time, eachPoint.Timestamp) + } + } + pts.SetAggregations(map[string][]string{ + "avg": stringIndex, + }) + sort.Sort(pts) + return data.CHResponses{{ + Data: &data.Data{ + Points: pts, + AM: m, + CommonStep: 60, + }, + From: from, + Until: until, + }} +} + +// emptyValues prefill slice of `size` with math.NaN +func emptyValues(size int) []float64 { + arr := make([]float64, 0, size) + for i := 0; i < size; i++ { + arr = append(arr, math.NaN()) + } + return arr +} + +// equalMetrics returns true if two slices of client.Metric are equal. +// This function only compares important fields of client.Metric. +func equalMetrics(m1, m2 []client.Metric) bool { + if len(m1) != len(m2) { + return false + } + sort.Slice(m1, func(i, j int) bool { + return m1[i].Name < m1[j].Name + }) + sort.Slice(m2, func(i, j int) bool { + return m2[i].Name < m2[j].Name + }) + for i := 0; i < len(m1); i++ { + // compare props + if m1[i].Name != m2[i].Name || + m1[i].StartTime != m2[i].StartTime || + m1[i].StopTime != m2[i].StopTime || + m1[i].StepTime != m2[i].StepTime { + return false + } + // compare values + if len(m1[i].Values) != len(m2[i].Values) { + return false + } + for j := 0; j < len(m1[i].Values); j++ { + a, b := m1[i].Values[j], m2[i].Values[j] + if math.IsNaN(a) && math.IsNaN(b) { + continue + } + if a != b { + return false + } + } + } + return true +} diff --git a/render/reply/json.go b/render/reply/json.go index 0e7ce5f78..8c8f0dd0f 100644 --- a/render/reply/json.go +++ b/render/reply/json.go @@ -9,9 +9,10 @@ import ( "net/http" v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb" + "go.uber.org/zap" + "github.com/lomik/graphite-clickhouse/pkg/scope" "github.com/lomik/graphite-clickhouse/render/data" - "go.uber.org/zap" ) // JSON is an implementation of carbonapi_v3_pb MultiGlobRequest and MultiFetchResponse interconnection. It accepts the @@ -104,10 +105,8 @@ func (*JSON) Reply(w http.ResponseWriter, r *http.Request, multiData data.CHResp mfr, err := multiData.ToMultiFetchResponseV3() if err != nil { http.Error(w, fmt.Sprintf("failed to convert response to v3pb.MultiFetchResponse: %v", err), http.StatusInternalServerError) + return } response := marshalJSON(mfr) - if err != nil { - http.Error(w, fmt.Sprintf("failed to convert v3pb.MultiFetchResponse to JSON: %v", err), http.StatusInternalServerError) - } w.Write(response) } diff --git a/render/reply/pickle.go b/render/reply/pickle.go index 674597d8b..067e13d67 100644 --- a/render/reply/pickle.go +++ b/render/reply/pickle.go @@ -8,11 +8,12 @@ import ( "net/http" "time" + graphitePickle "github.com/lomik/graphite-pickle" + "go.uber.org/zap" + "github.com/lomik/graphite-clickhouse/helper/point" "github.com/lomik/graphite-clickhouse/pkg/scope" "github.com/lomik/graphite-clickhouse/render/data" - graphitePickle "github.com/lomik/graphite-pickle" - "go.uber.org/zap" ) // Pickle is a formatter for python object serialization format. @@ -40,7 +41,7 @@ func (*Pickle) Reply(w http.ResponseWriter, r *http.Request, multiData data.CHRe ) }() - if data.Len() == 0 { + if data.AM.Len() == 0 { w.Write(graphitePickle.EmptyList) return } @@ -89,19 +90,21 @@ func (*Pickle) Reply(w http.ResponseWriter, r *http.Request, multiData data.CHRe p.SetItem() p.String("start") - p.Uint32(uint32(start)) + p.Uint32(start) p.SetItem() p.String("end") - p.Uint32(uint32(end)) + p.Uint32(end) p.SetItem() p.Append() pickleTime += time.Since(pickleStart) } - writeMetric := func(points []point.Point) error { + // write points and mark as written in writeMap + writeMetric := func(points []point.Point, writeMap map[string]struct{}) error { metricName := data.MetricName(points[0].MetricID) + writeMap[metricName] = struct{}{} step, err := data.GetStep(points[0].MetricID) if err != nil { logger.Error("fail to get step", zap.Error(err)) @@ -115,15 +118,27 @@ func (*Pickle) Reply(w http.ResponseWriter, r *http.Request, multiData data.CHRe } nextMetric := data.GroupByMetric() + writtenMetrics := make(map[string]struct{}) + // fill metrics with points for { points := nextMetric() if len(points) == 0 { break } - if err := writeMetric(points); err != nil { + if err := writeMetric(points, writtenMetrics); err != nil { return } } + // fill metrics without points with NaN + if multiData[0].AppendOutEmptySeries && len(writtenMetrics) != data.AM.Len() && data.CommonStep > 0 { + for _, metricName := range data.AM.Series(false) { + if _, done := writtenMetrics[metricName]; !done { + for _, a := range data.AM.Get(metricName) { + writeAlias(a.DisplayName, a.Target, []point.Point{}, uint32(data.CommonStep)) + } + } + } + } p.Stop() } diff --git a/render/reply/protobuf.go b/render/reply/protobuf.go index 5216983a9..5f180c5ae 100644 --- a/render/reply/protobuf.go +++ b/render/reply/protobuf.go @@ -8,10 +8,11 @@ import ( "math" "net/http" + "go.uber.org/zap" + "github.com/lomik/graphite-clickhouse/helper/point" "github.com/lomik/graphite-clickhouse/pkg/scope" "github.com/lomik/graphite-clickhouse/render/data" - "go.uber.org/zap" ) var pbVarints []byte @@ -42,18 +43,19 @@ func replyProtobuf(p pb, w http.ResponseWriter, r *http.Request, multiData data. from := uint32(d.From) until := uint32(d.Until) - if data.Len() == 0 { - continue - } totalWritten++ nextMetric := data.GroupByMetric() + writtenMetrics := make(map[string]struct{}) + + // fill metrics with points for { points := nextMetric() if len(points) == 0 { break } metricName := data.MetricName(points[0].MetricID) + writtenMetrics[metricName] = struct{}{} step, err := data.GetStep(points[0].MetricID) if err != nil { logger.Error("fail to get step", zap.Error(err)) @@ -71,6 +73,17 @@ func replyProtobuf(p pb, w http.ResponseWriter, r *http.Request, multiData data. p.writeBody(writer, a.Target, a.DisplayName, function, from, until, step, points) } } + + // fill metrics without points with NaN + if d.AppendOutEmptySeries && len(writtenMetrics) < data.AM.Len() && data.CommonStep > 0 { + for _, metricName := range data.AM.Series(false) { + if _, done := writtenMetrics[metricName]; !done { + for _, a := range data.AM.Get(metricName) { + p.writeBody(writer, a.Target, a.DisplayName, "any", from, until, uint32(data.CommonStep), []point.Point{}) + } + } + } + } } if totalWritten == 0 { diff --git a/render/reply/v3_pb_test.go b/render/reply/v3_pb_test.go index 5b746151e..07adbff83 100644 --- a/render/reply/v3_pb_test.go +++ b/render/reply/v3_pb_test.go @@ -8,6 +8,7 @@ import ( "testing" v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb" + "github.com/lomik/graphite-clickhouse/helper/point" ) @@ -132,13 +133,21 @@ func TestV3PBWriteBody(t *testing.T) { } if len(resp.Metrics) != len(tt.response.Metrics) { - t.Fatalf("incorrect amount of metrics, expected %v, got %v", len(resp.Metrics), len(tt.response.Metrics)) + t.Fatalf( + "incorrect amount of metrics, expected %v, got %v", + len(resp.Metrics), + len(tt.response.Metrics), + ) } for i := range resp.Metrics { if resp.Metrics[i].Name != tt.response.Metrics[i].Name { if !reflect.DeepEqual(resp.Metrics[i], tt.response.Metrics[i]) { - t.Fatalf("replies are not same.\ngot:\n%+v\n\nexpected:\n%+v", resp.Metrics[i], tt.response.Metrics[i]) + t.Fatalf( + "replies are not same.\ngot:\n%+v\n\nexpected:\n%+v", + resp.Metrics[i], + tt.response.Metrics[i], + ) } } } diff --git a/tests/emptyseries_append/carbon-clickhouse.conf.tpl b/tests/emptyseries_append/carbon-clickhouse.conf.tpl new file mode 100644 index 000000000..41d7ce56d --- /dev/null +++ b/tests/emptyseries_append/carbon-clickhouse.conf.tpl @@ -0,0 +1,45 @@ +[common] + +[data] +path = "/etc/carbon-clickhouse/data" +chunk-interval = "1s" +chunk-auto-interval = "" + +[upload.graphite_index] +type = "index" +table = "graphite_index" +url = "{{ .CLICKHOUSE_URL }}/" +timeout = "2m30s" +cache-ttl = "1h" + +[upload.graphite_tags] +type = "tagged" +table = "graphite_tags" +threads = 3 +url = "{{ .CLICKHOUSE_URL }}/" +timeout = "2m30s" +cache-ttl = "1h" + +[upload.graphite_reverse] +type = "points-reverse" +table = "graphite_reverse" +url = "{{ .CLICKHOUSE_URL }}/" +timeout = "2m30s" +zero-timestamp = false + +[upload.graphite] +type = "points" +table = "graphite" +url = "{{ .CLICKHOUSE_URL }}/" +timeout = "2m30s" +zero-timestamp = false + +[tcp] +listen = ":2003" +enabled = true +drop-future = "0s" +drop-past = "0s" + +[logging] +file = "/etc/carbon-clickhouse/carbon-clickhouse.log" +level = "debug" diff --git a/tests/emptyseries_append/graphite-clickhouse.conf.tpl b/tests/emptyseries_append/graphite-clickhouse.conf.tpl new file mode 100644 index 000000000..a5ff5c6a4 --- /dev/null +++ b/tests/emptyseries_append/graphite-clickhouse.conf.tpl @@ -0,0 +1,34 @@ +[common] +listen = "{{ .GCH_ADDR }}" +max-cpu = 0 +max-metrics-in-render-answer = 10000 +max-metrics-per-target = 10000 +headers-to-log = [ "X-Ctx-Carbonapi-Uuid" ] +append-empty-series = true + +[clickhouse] +url = "{{ .CLICKHOUSE_URL }}/?max_rows_to_read=500000000&max_result_bytes=1073741824&readonly=2&log_queries=1" +data-timeout = "30s" + +index-table = "graphite_index" +index-use-daily = true +index-timeout = "1m" +internal-aggregation = true + +tagged-table = "graphite_tags" +tagged-autocomplete-days = 1 + +[[data-table]] +# # clickhouse table name +table = "graphite" +# # points in table are stored with reverse path +reverse = false +rollup-conf = "auto" + +[[logging]] +logger = "" +file = "{{ .GCH_DIR }}/graphite-clickhouse.log" +level = "info" +encoding = "json" +encoding-time = "iso8601" +encoding-duration = "seconds" diff --git a/tests/emptyseries_append/test.toml b/tests/emptyseries_append/test.toml new file mode 100644 index 000000000..827512eac --- /dev/null +++ b/tests/emptyseries_append/test.toml @@ -0,0 +1,115 @@ +[test] +precision = "10s" + +[[test.clickhouse]] +version = "21.3" +dir = "tests/clickhouse/rollup" + +[[test.clickhouse]] +version = "22.8" +dir = "tests/clickhouse/rollup" + +[[test.clickhouse]] +version = "latest" +dir = "tests/clickhouse/rollup" + +[test.carbon_clickhouse] +template = "carbon-clickhouse.conf.tpl" + +[[test.graphite_clickhouse]] +template = "graphite-clickhouse.conf.tpl" + +[[test.input]] +name = "test.avg" +points = [{value = 3.0, time = "rnow-30"}, {value = 0.0, time = "rnow-20"}, {value = 1.0, time = "rnow+21"}, {value = 2.0, time = "rnow+30"}] + +[[test.input]] +name = "test.sum" +points = [{value = 3.0, time = "rnow-30"}, {value = 0.0, time = "rnow-20"}, {value = 1.0, time = "rnow-10"}, {value = 2.0, time = "rnow"}] + +[[test.input]] +name = "test.min" +points = [{value = 3.0, time = "rnow-30"}, {value = 0.0, time = "rnow-20"}, {value = 1.0, time = "rnow-10"}, {value = 2.0, time = "rnow"}] + +[[test.input]] +name = "test.max" +points = [{value = 3.0, time = "rnow-30"}, {value = 0.0, time = "rnow-20"}, {value = 1.0, time = "rnow-10"}, {value = 2.0, time = "rnow", delay = "5s"}] + +# Test aggregations +[[test.input]] +name = "test.avg" +points = [{value = 0.0, time = "rnow-30"}, {value = 4.0, time = "rnow+30"}] + +# Test aggregations +[[test.input]] +name = "test.sum" +points = [{value = 0.0, time = "rnow-1"}, {value = 4.0, time = "rnow+1"}] + +# Test aggregations +[[test.input]] +name = "test.min" +points = [{value = 0.0, time = "rnow-1"}, {value = 4.0, time = "rnow+1"}] + +# Test aggregations +[[test.input]] +name = "test.max" +points = [{value = 0.0, time = "rnow-1"}, {value = 4.0, time = "rnow+1"}] + +########################################################################## +# Aggregated, Deduplication not work with internal aggregation +########################################################################## + +[[test.render_checks]] +name = "Test rollup" +from = "rnow-10" +until = "rnow+10" +targets = [ + "test.{avg,min,max,sum}" +] + +[[test.render_checks.result]] +name = "test.avg" +path = "test.{avg,min,max,sum}" +consolidation = "any" +start = "rnow-10" +stop = "rnow+20" +step = 10 +req_start = "rnow-10" +req_stop = "rnow+20" +values = [nan, nan, nan] + +[[test.render_checks.result]] +name = "test.sum" +path = "test.{avg,min,max,sum}" +consolidation = "sum" +start = "rnow-10" +stop = "rnow+20" +step = 10 +req_start = "rnow-10" +req_stop = "rnow+20" +values = [1.0, 6.0, nan] + +[[test.render_checks.result]] +name = "test.min" +path = "test.{avg,min,max,sum}" +consolidation = "min" +start = "rnow-10" +stop = "rnow+20" +step = 10 +req_start = "rnow-10" +req_stop = "rnow+20" +values = [0.0, 2.0, nan] + +[[test.render_checks.result]] +name = "test.max" +path = "test.{avg,min,max,sum}" +consolidation = "max" +start = "rnow-10" +stop = "rnow+20" +step = 10 +req_start = "rnow-10" +req_stop = "rnow+20" +values = [1.0, 4.0, nan] + +# End - Test rollup +########################################################################## diff --git a/tests/emptyseries_noappend/carbon-clickhouse.conf.tpl b/tests/emptyseries_noappend/carbon-clickhouse.conf.tpl new file mode 100644 index 000000000..41d7ce56d --- /dev/null +++ b/tests/emptyseries_noappend/carbon-clickhouse.conf.tpl @@ -0,0 +1,45 @@ +[common] + +[data] +path = "/etc/carbon-clickhouse/data" +chunk-interval = "1s" +chunk-auto-interval = "" + +[upload.graphite_index] +type = "index" +table = "graphite_index" +url = "{{ .CLICKHOUSE_URL }}/" +timeout = "2m30s" +cache-ttl = "1h" + +[upload.graphite_tags] +type = "tagged" +table = "graphite_tags" +threads = 3 +url = "{{ .CLICKHOUSE_URL }}/" +timeout = "2m30s" +cache-ttl = "1h" + +[upload.graphite_reverse] +type = "points-reverse" +table = "graphite_reverse" +url = "{{ .CLICKHOUSE_URL }}/" +timeout = "2m30s" +zero-timestamp = false + +[upload.graphite] +type = "points" +table = "graphite" +url = "{{ .CLICKHOUSE_URL }}/" +timeout = "2m30s" +zero-timestamp = false + +[tcp] +listen = ":2003" +enabled = true +drop-future = "0s" +drop-past = "0s" + +[logging] +file = "/etc/carbon-clickhouse/carbon-clickhouse.log" +level = "debug" diff --git a/tests/emptyseries_noappend/graphite-clickhouse.conf.tpl b/tests/emptyseries_noappend/graphite-clickhouse.conf.tpl new file mode 100644 index 000000000..a3bfa0fc9 --- /dev/null +++ b/tests/emptyseries_noappend/graphite-clickhouse.conf.tpl @@ -0,0 +1,34 @@ +[common] +listen = "{{ .GCH_ADDR }}" +max-cpu = 0 +max-metrics-in-render-answer = 10000 +max-metrics-per-target = 10000 +headers-to-log = [ "X-Ctx-Carbonapi-Uuid" ] +append-empty-series = false + +[clickhouse] +url = "{{ .CLICKHOUSE_URL }}/?max_rows_to_read=500000000&max_result_bytes=1073741824&readonly=2&log_queries=1" +data-timeout = "30s" + +index-table = "graphite_index" +index-use-daily = true +index-timeout = "1m" +internal-aggregation = true + +tagged-table = "graphite_tags" +tagged-autocomplete-days = 1 + +[[data-table]] +# # clickhouse table name +table = "graphite" +# # points in table are stored with reverse path +reverse = false +rollup-conf = "auto" + +[[logging]] +logger = "" +file = "{{ .GCH_DIR }}/graphite-clickhouse.log" +level = "info" +encoding = "json" +encoding-time = "iso8601" +encoding-duration = "seconds" diff --git a/tests/emptyseries_noappend/test.toml b/tests/emptyseries_noappend/test.toml new file mode 100644 index 000000000..2a0832ed1 --- /dev/null +++ b/tests/emptyseries_noappend/test.toml @@ -0,0 +1,104 @@ +[test] +precision = "10s" + +[[test.clickhouse]] +version = "21.3" +dir = "tests/clickhouse/rollup" + +[[test.clickhouse]] +version = "22.8" +dir = "tests/clickhouse/rollup" + +[[test.clickhouse]] +version = "latest" +dir = "tests/clickhouse/rollup" + +[test.carbon_clickhouse] +template = "carbon-clickhouse.conf.tpl" + +[[test.graphite_clickhouse]] +template = "graphite-clickhouse.conf.tpl" + +[[test.input]] +name = "test.avg" +points = [{value = 3.0, time = "rnow-30"}, {value = 0.0, time = "rnow-20"}, {value = 1.0, time = "rnow+21"}, {value = 2.0, time = "rnow+30"}] + +[[test.input]] +name = "test.sum" +points = [{value = 3.0, time = "rnow-30"}, {value = 0.0, time = "rnow-20"}, {value = 1.0, time = "rnow-10"}, {value = 2.0, time = "rnow"}] + +[[test.input]] +name = "test.min" +points = [{value = 3.0, time = "rnow-30"}, {value = 0.0, time = "rnow-20"}, {value = 1.0, time = "rnow-10"}, {value = 2.0, time = "rnow"}] + +[[test.input]] +name = "test.max" +points = [{value = 3.0, time = "rnow-30"}, {value = 0.0, time = "rnow-20"}, {value = 1.0, time = "rnow-10"}, {value = 2.0, time = "rnow", delay = "5s"}] + +# Test aggregations +[[test.input]] +name = "test.avg" +points = [{value = 0.0, time = "rnow-30"}, {value = 4.0, time = "rnow+30"}] + +# Test aggregations +[[test.input]] +name = "test.sum" +points = [{value = 0.0, time = "rnow-1"}, {value = 4.0, time = "rnow+1"}] + +# Test aggregations +[[test.input]] +name = "test.min" +points = [{value = 0.0, time = "rnow-1"}, {value = 4.0, time = "rnow+1"}] + +# Test aggregations +[[test.input]] +name = "test.max" +points = [{value = 0.0, time = "rnow-1"}, {value = 4.0, time = "rnow+1"}] + +########################################################################## +# Aggregated, Deduplication not work with internal aggregation +########################################################################## + +[[test.render_checks]] +name = "Test rollup" +from = "rnow-10" +until = "rnow+10" +targets = [ + "test.{avg,min,max,sum}" +] + +[[test.render_checks.result]] +name = "test.sum" +path = "test.{avg,min,max,sum}" +consolidation = "sum" +start = "rnow-10" +stop = "rnow+20" +step = 10 +req_start = "rnow-10" +req_stop = "rnow+20" +values = [1.0, 6.0, nan] + +[[test.render_checks.result]] +name = "test.min" +path = "test.{avg,min,max,sum}" +consolidation = "min" +start = "rnow-10" +stop = "rnow+20" +step = 10 +req_start = "rnow-10" +req_stop = "rnow+20" +values = [0.0, 2.0, nan] + +[[test.render_checks.result]] +name = "test.max" +path = "test.{avg,min,max,sum}" +consolidation = "max" +start = "rnow-10" +stop = "rnow+20" +step = 10 +req_start = "rnow-10" +req_stop = "rnow+20" +values = [1.0, 4.0, nan] + +# End - Test rollup +##########################################################################