Skip to content

Commit

Permalink
return empty series if metrics exist in index and has no points (#240)
Browse files Browse the repository at this point in the history
  • Loading branch information
lordvidex authored Jul 27, 2023
1 parent b5ac900 commit dd45bed
Show file tree
Hide file tree
Showing 22 changed files with 849 additions and 80 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ else
VERSION:=$(shell sh -c 'git describe --always --tags | sed -e "s/^v//i"')
endif

OS ?= linux

SRCS:=$(shell find . -name '*.go')

all: $(NAME)
Expand Down Expand Up @@ -58,10 +60,10 @@ client: $(NAME)
gox-build:
rm -rf out
mkdir -p out
gox -ldflags '-X main.BuildVersion=$(VERSION)' -os="linux" -arch="amd64" -arch="arm64" -output="out/$(NAME)-{{.OS}}-{{.Arch}}" github.com/lomik/$(NAME)
gox -ldflags '-X main.BuildVersion=$(VERSION)' -os="$(OS)" -arch="amd64" -arch="arm64" -output="out/$(NAME)-{{.OS}}-{{.Arch}}" github.com/lomik/$(NAME)
ls -la out/
mkdir -p out/root/etc/$(NAME)/
./out/$(NAME)-linux-amd64 -config-print-default > out/root/etc/$(NAME)/$(NAME).conf
./out/$(NAME)-$(OS)-amd64 -config-print-default > out/root/etc/$(NAME)/$(NAME).conf

fpm-deb:
$(MAKE) fpm-build-deb ARCH=amd64
Expand Down
2 changes: 1 addition & 1 deletion cmd/e2e-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func main() {
verifyCount := 0
verifyFailed := 0

_, err = cmdExec(DockerBinary, "network", "exists", DockerNetwork)
_, err = cmdExec(DockerBinary, "network", "inspect", DockerNetwork)
if err != nil {
out, err := cmdExec(DockerBinary, "network", "create", DockerNetwork)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
toml "github.com/pelletier/go-toml"
"go.uber.org/zap"

"github.com/lomik/zapwriter"

"github.com/lomik/graphite-clickhouse/cache"
"github.com/lomik/graphite-clickhouse/helper/date"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/limiter"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/zapwriter"
)

// Cache config
Expand All @@ -47,12 +48,12 @@ type Common struct {
MaxCPU int `toml:"max-cpu" json:"max-cpu"`
MaxMetricsInFindAnswer int `toml:"max-metrics-in-find-answer" json:"max-metrics-in-find-answer" comment:"limit number of results from find query, 0=unlimited"`
MaxMetricsPerTarget int `toml:"max-metrics-per-target" json:"max-metrics-per-target" comment:"limit numbers of queried metrics per target in /render requests, 0 or negative = unlimited"`
AppendEmptySeries bool `toml:"append-empty-series" json:"append-empty-series" comment:"if true, always return points for all metrics, replacing empty results with list of NaN"`
TargetBlacklist []string `toml:"target-blacklist" json:"target-blacklist" comment:"daemon returns empty response if query matches any of regular expressions" commented:"true"`
Blacklist []*regexp.Regexp `toml:"-" json:"-"` // compiled TargetBlacklist
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`
FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

FindCache cache.BytesCache `toml:"-" json:"-"`
}
Expand Down
2 changes: 2 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
max-metrics-in-find-answer = 0
# limit numbers of queried metrics per target in /render requests, 0 or negative = unlimited
max-metrics-per-target = 15000
# if true, always return points for all metrics, replacing empty results with list of NaN
append-empty-series = false
# daemon returns empty response if query matches any of regular expressions
# target-blacklist = []
# daemon will return the freed memory to the OS when it>0
Expand Down
62 changes: 56 additions & 6 deletions helper/client/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -120,14 +121,25 @@ func Render(client *http.Client, address string, format FormatType, targets []st
return queryParams, nil, resp.Header, NewHttpError(resp.StatusCode, string(b))
}

var metrics []Metric
metrics, err := Decode(b, format)
if err != nil {
return queryParams, nil, resp.Header, err
}
return queryParams, metrics, resp.Header, nil
}

// Decode converts data in the give format to a Metric
func Decode(b []byte, format FormatType) ([]Metric, error) {
var (
metrics []Metric
err error
)
switch format {
case FormatPb_v3:
var r protov3.MultiFetchResponse
err = r.Unmarshal(b)
if err != nil {
return queryParams, nil, resp.Header, err
return nil, err
}
metrics = make([]Metric, 0, len(r.Metrics))
for _, m := range r.Metrics {
Expand All @@ -150,7 +162,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st
var r protov2.MultiFetchResponse
err = r.Unmarshal(b)
if err != nil {
return queryParams, nil, resp.Header, err
return nil, err
}
metrics = make([]Metric, 0, len(r.Metrics))
for _, m := range r.Metrics {
Expand All @@ -173,7 +185,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st
decoder := pickle.NewDecoder(reader)
p, err := decoder.Decode()
if err != nil {
return queryParams, nil, resp.Header, err
return nil, err
}
for _, v := range p.([]interface{}) {
m := v.(map[interface{}]interface{})
Expand All @@ -195,9 +207,47 @@ func Render(client *http.Client, address string, format FormatType, targets []st
Values: values,
})
}
case FormatJSON:
var r jsonResponse
err = json.Unmarshal(b, &r)
if err != nil {
return nil, err
}
metrics = make([]Metric, 0, len(r.Metrics))
for _, m := range r.Metrics {
values := make([]float64, len(m.Values))
for i, v := range m.Values {
if v == nil {
values[i] = math.NaN()
} else {
values[i] = *v
}
}
metrics = append(metrics, Metric{
Name: m.Name,
PathExpression: m.PathExpression,
StartTime: m.StartTime,
StopTime: m.StopTime,
StepTime: m.StepTime,
Values: values,
})
}
default:
return queryParams, nil, resp.Header, ErrUnsupportedFormat
return nil, ErrUnsupportedFormat
}
return metrics, nil
}

return queryParams, metrics, resp.Header, nil
// jsonResponse is a simple struct to decode JSON responses for testing purposes
type jsonResponse struct {
Metrics []jsonMetric `json:"metrics"`
}

type jsonMetric struct {
Name string `json:"name"`
PathExpression string `json:"pathExpression"`
Values []*float64 `json:"values"`
StartTime int64 `json:"startTime"`
StopTime int64 `json:"stopTime"`
StepTime int64 `json:"stepTime"`
}
5 changes: 4 additions & 1 deletion helper/point/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func FillNulls(points []Point, from, until, step uint32) (start, stop, count uin
count = (stop - start) / step
last := start - step
currentPoint := 0
metricID := points[0].MetricID
var metricID uint32
if len(points) > 0 {
metricID = points[0].MetricID
}
getter = func() (float64, error) {
if stop <= last {
return 0, ErrTimeGreaterStop
Expand Down
122 changes: 85 additions & 37 deletions render/data/ch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

v2pb "github.com/go-graphite/protocol/carbonapi_v2_pb"
v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb"

"github.com/lomik/graphite-clickhouse/helper/point"
)

Expand All @@ -14,31 +15,24 @@ type CHResponse struct {
Data *Data
From int64
Until int64
// if true, return points for all metrics, replacing empty results with list of NaN
AppendOutEmptySeries bool
}

// CHResponses is a slice of CHResponse
type CHResponses []CHResponse

// EmptyResponse returns an CHResponses with one element containing emptyData for the following encoding
func EmptyResponse() CHResponses { return CHResponses{{emptyData, 0, 0}} }
func EmptyResponse() CHResponses { return CHResponses{{Data: emptyData}} }

// ToMultiFetchResponseV2 returns protobuf v2pb.MultiFetchResponse message for given CHResponse
func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error) {
mfr := &v2pb.MultiFetchResponse{Metrics: make([]v2pb.FetchResponse, 0)}
data := c.Data
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
start, stop, count, getValue := point.FillNulls(points, uint32(c.From), uint32(c.Until), step)

addResponse := func(name string, step uint32, points []point.Point) error {
from, until := uint32(c.From), uint32(c.Until)
start, stop, count, getValue := point.FillNulls(points, from, until, step)
values := make([]float64, 0, count)
isAbsent := make([]bool, 0, count)
for {
Expand All @@ -48,15 +42,15 @@ func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error)
break
}
// if err is not point.ErrTimeGreaterStop, the points are corrupted
return nil, err
return err
}
if math.IsNaN(value) {
values = append(values, 0)
isAbsent = append(isAbsent, true)
continue
} else {
values = append(values, value)
isAbsent = append(isAbsent, false)
}
values = append(values, value)
isAbsent = append(isAbsent, false)
}
for _, a := range data.AM.Get(name) {
fr := v2pb.FetchResponse{
Expand All @@ -69,6 +63,38 @@ func (c *CHResponse) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error)
}
mfr.Metrics = append(mfr.Metrics, fr)
}
return nil
}

// process metrics with points
writtenMetrics := make(map[string]struct{})
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
writtenMetrics[name] = struct{}{}
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
if err := addResponse(name, step, points); err != nil {
return nil, err
}
}
// process metrics with no points
if c.AppendOutEmptySeries && len(writtenMetrics) < data.AM.Len() && data.CommonStep > 0 {
for _, metricName := range data.AM.Series(false) {
if _, done := writtenMetrics[metricName]; !done {
err := addResponse(metricName, uint32(data.CommonStep), []point.Point{})
if err != nil {
return nil, err
}
}
}
}
return mfr, nil
}
Expand All @@ -90,23 +116,9 @@ func (cc *CHResponses) ToMultiFetchResponseV2() (*v2pb.MultiFetchResponse, error
func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error) {
mfr := &v3pb.MultiFetchResponse{Metrics: make([]v3pb.FetchResponse, 0)}
data := c.Data
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
consolidationFunc, err := data.GetAggregation(id)
if err != nil {
return nil, err
}
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
start, stop, count, getValue := point.FillNulls(points, uint32(c.From), uint32(c.Until), step)
addResponse := func(name, function string, step uint32, points []point.Point) error {
from, until := uint32(c.From), uint32(c.Until)
start, stop, count, getValue := point.FillNulls(points, from, until, step)
values := make([]float64, 0, count)
for {
value, err := getValue()
Expand All @@ -115,15 +127,15 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error)
break
}
// if err is not point.ErrTimeGreaterStop, the points are corrupted
return nil, err
return err
}
values = append(values, value)
}
for _, a := range data.AM.Get(name) {
fr := v3pb.FetchResponse{
Name: a.DisplayName,
PathExpression: a.Target,
ConsolidationFunc: consolidationFunc,
ConsolidationFunc: function,
StartTime: int64(start),
StopTime: int64(stop),
StepTime: int64(step),
Expand All @@ -135,6 +147,42 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error)
}
mfr.Metrics = append(mfr.Metrics, fr)
}
return nil
}

// process metrics with points
writtenMetrics := make(map[string]struct{})
nextMetric := data.GroupByMetric()
for {
points := nextMetric()
if len(points) == 0 {
break
}
id := points[0].MetricID
name := data.MetricName(id)
writtenMetrics[name] = struct{}{}
consolidationFunc, err := data.GetAggregation(id)
if err != nil {
return nil, err
}
step, err := data.GetStep(id)
if err != nil {
return nil, err
}
if err := addResponse(name, consolidationFunc, step, points); err != nil {
return nil, err
}
}
// process metrics with no points
if c.AppendOutEmptySeries && len(writtenMetrics) < data.AM.Len() && data.CommonStep > 0 {
for _, metricName := range data.AM.Series(false) {
if _, done := writtenMetrics[metricName]; !done {
err := addResponse(metricName, "any", uint32(data.CommonStep), []point.Point{})
if err != nil {
return nil, err
}
}
}
}
return mfr, nil
}
Expand Down
Loading

0 comments on commit dd45bed

Please sign in to comment.