Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return empty series if metrics exist in index and has no points #240

Merged
merged 10 commits into from
Jul 27, 2023
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ else
VERSION:=$(shell sh -c 'git describe --always --tags | sed -e "s/^v//i"')
endif

OS ?= linux

SRCS:=$(shell find . -name '*.go')

all: $(NAME)
Expand Down Expand Up @@ -58,10 +60,10 @@ client: $(NAME)
gox-build:
rm -rf out
mkdir -p out
gox -ldflags '-X main.BuildVersion=$(VERSION)' -os="linux" -arch="amd64" -arch="arm64" -output="out/$(NAME)-{{.OS}}-{{.Arch}}" github.com/lomik/$(NAME)
gox -ldflags '-X main.BuildVersion=$(VERSION)' -os="$(OS)" -arch="amd64" -arch="arm64" -output="out/$(NAME)-{{.OS}}-{{.Arch}}" github.com/lomik/$(NAME)
ls -la out/
mkdir -p out/root/etc/$(NAME)/
./out/$(NAME)-linux-amd64 -config-print-default > out/root/etc/$(NAME)/$(NAME).conf
./out/$(NAME)-$(OS)-amd64 -config-print-default > out/root/etc/$(NAME)/$(NAME).conf

fpm-deb:
$(MAKE) fpm-build-deb ARCH=amd64
Expand Down
2 changes: 1 addition & 1 deletion cmd/e2e-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func main() {
verifyCount := 0
verifyFailed := 0

_, err = cmdExec(DockerBinary, "network", "exists", DockerNetwork)
_, err = cmdExec(DockerBinary, "network", "inspect", DockerNetwork)
if err != nil {
out, err := cmdExec(DockerBinary, "network", "create", DockerNetwork)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
toml "github.com/pelletier/go-toml"
"go.uber.org/zap"

"github.com/lomik/zapwriter"

"github.com/lomik/graphite-clickhouse/cache"
"github.com/lomik/graphite-clickhouse/helper/date"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/limiter"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/zapwriter"
)

// Cache config
Expand All @@ -47,12 +48,12 @@ type Common struct {
MaxCPU int `toml:"max-cpu" json:"max-cpu"`
MaxMetricsInFindAnswer int `toml:"max-metrics-in-find-answer" json:"max-metrics-in-find-answer" comment:"limit number of results from find query, 0=unlimited"`
MaxMetricsPerTarget int `toml:"max-metrics-per-target" json:"max-metrics-per-target" comment:"limit numbers of queried metrics per target in /render requests, 0 or negative = unlimited"`
AppendEmptySeries bool `toml:"append-empty-series" json:"append-empty-series" comment:"if true, always return points for all metrics, replacing empty results with list of NaN"`
TargetBlacklist []string `toml:"target-blacklist" json:"target-blacklist" comment:"daemon returns empty response if query matches any of regular expressions" commented:"true"`
Blacklist []*regexp.Regexp `toml:"-" json:"-"` // compiled TargetBlacklist
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`
FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

FindCache cache.BytesCache `toml:"-" json:"-"`
}
Expand Down
2 changes: 2 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
max-metrics-in-find-answer = 0
# limit numbers of queried metrics per target in /render requests, 0 or negative = unlimited
max-metrics-per-target = 15000
# if true, always return points for all metrics, replacing empty results with list of NaN
append-empty-series = false
# daemon returns empty response if query matches any of regular expressions
# target-blacklist = []
# daemon will return the freed memory to the OS when it>0
Expand Down
62 changes: 56 additions & 6 deletions helper/client/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -120,14 +121,25 @@ func Render(client *http.Client, address string, format FormatType, targets []st
return queryParams, nil, resp.Header, NewHttpError(resp.StatusCode, string(b))
}

var metrics []Metric
metrics, err := Decode(b, format)
if err != nil {
return queryParams, nil, resp.Header, err
}
return queryParams, metrics, resp.Header, nil
}

// Decode converts data in the give format to a Metric
func Decode(b []byte, format FormatType) ([]Metric, error) {
var (
metrics []Metric
err error
)
switch format {
case FormatPb_v3:
var r protov3.MultiFetchResponse
err = r.Unmarshal(b)
if err != nil {
return queryParams, nil, resp.Header, err
return nil, err
}
metrics = make([]Metric, 0, len(r.Metrics))
for _, m := range r.Metrics {
Expand All @@ -150,7 +162,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st
var r protov2.MultiFetchResponse
err = r.Unmarshal(b)
if err != nil {
return queryParams, nil, resp.Header, err
return nil, err
}
metrics = make([]Metric, 0, len(r.Metrics))
for _, m := range r.Metrics {
Expand All @@ -173,7 +185,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st
decoder := pickle.NewDecoder(reader)
p, err := decoder.Decode()
if err != nil {
return queryParams, nil, resp.Header, err
return nil, err
}
for _, v := range p.([]interface{}) {
m := v.(map[interface{}]interface{})
Expand All @@ -195,9 +207,47 @@ func Render(client *http.Client, address string, format FormatType, targets []st
Values: values,
})
}
case FormatJSON:
var r jsonResponse
err = json.Unmarshal(b, &r)
if err != nil {
return nil, err
}
metrics = make([]Metric, 0, len(r.Metrics))
for _, m := range r.Metrics {
values := make([]float64, len(m.Values))
for i, v := range m.Values {
if v == nil {
values[i] = math.NaN()
} else {
values[i] = *v
}
}
metrics = append(metrics, Metric{
Name: m.Name,
PathExpression: m.PathExpression,
StartTime: m.StartTime,
StopTime: m.StopTime,
StepTime: m.StepTime,
Values: values,
})
}
default:
return queryParams, nil, resp.Header, ErrUnsupportedFormat
return nil, ErrUnsupportedFormat
}
return metrics, nil
}

return queryParams, metrics, resp.Header, nil
// jsonResponse is a simple struct to decode JSON responses for testing purposes
type jsonResponse struct {
Metrics []jsonMetric `json:"metrics"`
}

type jsonMetric struct {
Name string `json:"name"`
PathExpression string `json:"pathExpression"`
Values []*float64 `json:"values"`
StartTime int64 `json:"startTime"`
StopTime int64 `json:"stopTime"`
StepTime int64 `json:"stepTime"`
}
5 changes: 4 additions & 1 deletion helper/point/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func FillNulls(points []Point, from, until, step uint32) (start, stop, count uin
count = (stop - start) / step
last := start - step
currentPoint := 0
metricID := points[0].MetricID
var metricID uint32
if len(points) > 0 {
metricID = points[0].MetricID
}
getter = func() (float64, error) {
if stop <= last {
return 0, ErrTimeGreaterStop
Expand Down
122 changes: 85 additions & 37 deletions render/data/ch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

v2pb "github.com/go-graphite/protocol/carbonapi_v2_pb"
v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb"

"github.com/lomik/graphite-clickhouse/helper/point"
)

Expand All @@ -14,31 +15,24 @@ type CHResponse struct {
Data *Data
From int64
Until int64
// if true, return points for all metrics, replacing empty results with list of NaN
AppendOutEmptySeries bool
}

// CHResponses is a slice of CHResponse
type CHResponses []CHResponse

// EmptyResponse returns an CHResponses with one element containing emptyData for the following encoding
func EmptyResponse() CHResponses { return CHResponses{{emptyData, 0, 0}} }
func EmptyResponse() CHResponses { return CHResponses{{Data: emptyData}} }

// ToMultiFetchResponseV2 returns protobuf v2pb.MultiFetchResponse message for given CHResponse
func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error) {
mfr := &v2pb.MultiFetchResponse{Metrics: make([]v2pb.FetchResponse, 0)}
data := c.Data
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
start, stop, count, getValue := point.FillNulls(points, uint32(c.From), uint32(c.Until), step)

addResponse := func(name string, step uint32, points []point.Point) error {
from, until := uint32(c.From), uint32(c.Until)
start, stop, count, getValue := point.FillNulls(points, from, until, step)
values := make([]float64, 0, count)
isAbsent := make([]bool, 0, count)
for {
Expand All @@ -48,15 +42,15 @@ func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error)
break
}
// if err is not point.ErrTimeGreaterStop, the points are corrupted
return nil, err
return err
}
if math.IsNaN(value) {
values = append(values, 0)
isAbsent = append(isAbsent, true)
continue
} else {
values = append(values, value)
isAbsent = append(isAbsent, false)
}
values = append(values, value)
isAbsent = append(isAbsent, false)
}
for _, a := range data.AM.Get(name) {
fr := v2pb.FetchResponse{
Expand All @@ -69,6 +63,38 @@ func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error)
}
mfr.Metrics = append(mfr.Metrics, fr)
}
return nil
}

// process metrics with points
writtenMetrics := make(map[string]struct{})
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
writtenMetrics[name] = struct{}{}
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
if err := addResponse(name, step, points); err != nil {
return nil, err
}
}
// process metrics with no points
if c.AppendOutEmptySeries && len(writtenMetrics) < data.AM.Len() && data.CommonStep > 0 {
for _, metricName := range data.AM.Series(false) {
if _, done := writtenMetrics[metricName]; !done {
err := addResponse(metricName, uint32(data.CommonStep), []point.Point{})
if err != nil {
return nil, err
}
}
}
}
return mfr, nil
}
Expand All @@ -90,23 +116,9 @@ func (cc *CHResponses) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error
func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToMultiFetchResponseV2 also need changes for Null Points like ToMultiFetchResponseV3

mfr := &v3pb.MultiFetchResponse{Metrics: make([]v3pb.FetchResponse, 0)}
data := c.Data
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
consolidationFunc, err := data.GetAggregation(id)
if err != nil {
return nil, err
}
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
start, stop, count, getValue := point.FillNulls(points, uint32(c.From), uint32(c.Until), step)
addResponse := func(name, function string, step uint32, points []point.Point) error {
from, until := uint32(c.From), uint32(c.Until)
start, stop, count, getValue := point.FillNulls(points, from, until, step)
values := make([]float64, 0, count)
for {
value, err := getValue()
Expand All @@ -115,15 +127,15 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error)
break
}
// if err is not point.ErrTimeGreaterStop, the points are corrupted
return nil, err
return err
}
values = append(values, value)
}
for _, a := range data.AM.Get(name) {
fr := v3pb.FetchResponse{
Name: a.DisplayName,
PathExpression: a.Target,
ConsolidationFunc: consolidationFunc,
ConsolidationFunc: function,
StartTime: int64(start),
StopTime: int64(stop),
StepTime: int64(step),
Expand All @@ -135,6 +147,42 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error)
}
mfr.Metrics = append(mfr.Metrics, fr)
}
return nil
}

// process metrics with points
writtenMetrics := make(map[string]struct{})
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
writtenMetrics[name] = struct{}{}
consolidationFunc, err := data.GetAggregation(id)
if err != nil {
return nil, err
}
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
if err := addResponse(name, consolidationFunc, step, points); err != nil {
return nil, err
}
}
// process metrics with no points
if c.AppendOutEmptySeries && len(writtenMetrics) < data.AM.Len() && data.CommonStep > 0 {
for _, metricName := range data.AM.Series(false) {
if _, done := writtenMetrics[metricName]; !done {
err := addResponse(metricName, "any", uint32(data.CommonStep), []point.Point{})
if err != nil {
return nil, err
}
}
}
}
return mfr, nil
}
Expand Down
Loading
Loading