diff --git a/config/config.go b/config/config.go index 1fbf3ee70..4c4ad63d9 100644 --- a/config/config.go +++ b/config/config.go @@ -91,8 +91,9 @@ var IndexReverse = map[string]uint8{ var IndexReverseNames = []string{"auto", "direct", "reversed"} type UserLimits struct { - MaxQueries int `toml:"max-queries" json:"max-queries" comment:"Max queries to fetch data"` - MaxConcurrent int `toml:"max-concurrent" json:"max-concurrent" comment:"Maximum concurrent queries to fetch data"` + MaxQueries int `toml:"max-queries" json:"max-queries" comment:"Max queries to fetch data"` + ConcurrentQueries int `toml:"concurrent-queries" json:"concurrent-queries" comment:"Concurrent queries to fetch data"` + AdaptiveQueries int `toml:"adaptive-queries" json:"adaptive-queries" comment:"Adaptive queries (based on load average) for increase/decrease concurrent queries"` Limiter limiter.ServerLimiter `toml:"-" json:"-"` } @@ -102,8 +103,9 @@ type QueryParam struct { URL string `toml:"url" json:"url" comment:"url for queries with durations greater or equal than"` DataTimeout time.Duration `toml:"data-timeout" json:"data-timeout" comment:"total timeout to fetch data"` - MaxQueries int `toml:"max-queries" json:"max-queries" comment:"Max queries to fetch data"` - MaxConcurrent int `toml:"max-concurrent" json:"max-concurrent" comment:"Maximum concurrent queries to fetch data"` + MaxQueries int `toml:"max-queries" json:"max-queries" comment:"Max queries to fetch data"` + ConcurrentQueries int `toml:"concurrent-queries" json:"concurrent-queries" comment:"Concurrent queries to fetch data"` + AdaptiveQueries int `toml:"adaptive-queries" json:"adaptive-queries" comment:"Adaptive queries (based on load average) for increase/decrease concurrent queries"` Limiter limiter.ServerLimiter `toml:"-" json:"-"` } @@ -134,38 +136,41 @@ func binarySearchQueryParamLe(a []QueryParam, duration time.Duration, start, end // ClickHouse config type ClickHouse struct { - URL string `toml:"url" json:"url" comment:"default url, see https://clickhouse.tech/docs/en/interfaces/http. Can be overwritten with query-params"` - DataTimeout time.Duration `toml:"data-timeout" json:"data-timeout" comment:"default total timeout to fetch data, can be overwritten with query-params"` - RenderMaxQueries int `toml:"render-max-queries" json:"render-max-queries" comment:"Max queries to render queiries"` - RenderMaxConcurrent int `toml:"render-max-concurrent" json:"render-max-concurrent" comment:"Maximum concurrent queries to render queiries"` - QueryParams []QueryParam `toml:"query-params" json:"query-params" comment:"customized query params (url, data timeout, limiters) for durations greater or equal"` - FindMaxQueries int `toml:"find-max-queries" json:"find-max-queries" comment:"Max queries for find queries"` - FindMaxConcurrent int `toml:"find-max-concurrent" json:"find-max-concurrent" comment:"Maximum concurrent queries for find queries"` - FindLimiter limiter.ServerLimiter `toml:"-" json:"-"` - TagsMaxQueries int `toml:"tags-max-queries" json:"tags-max-queries" comment:"Max queries for tags queries"` - TagsMaxConcurrent int `toml:"tags-max-concurrent" json:"tags-max-concurrent" comment:"Maximum concurrent queries for tags queries"` - TagsMinInQuery int `toml:"tags-min-in-query" json:"tags-min-in-query" comment:"Minimum tags in seriesByTag query"` - TagsMinInAutocomplete int `toml:"tags-min-in-autocomplete" json:"tags-min-in-autocomplete" comment:"Minimum tags in autocomplete query"` - TagsLimiter limiter.ServerLimiter `toml:"-" json:"-"` - UserLimits map[string]UserLimits `toml:"user-limits" json:"user-limits" comment:"customized query limiter for some users" commented:"true"` - DateFormat string `toml:"date-format" json:"date-format" comment:"Date format (default, utc, both)"` - IndexTable string `toml:"index-table" json:"index-table" comment:"see doc/index-table.md"` - IndexUseDaily bool `toml:"index-use-daily" json:"index-use-daily"` - IndexReverse string `toml:"index-reverse" json:"index-reverse" comment:"see doc/config.md"` - IndexReverses IndexReverses `toml:"index-reverses" json:"index-reverses" comment:"see doc/config.md" commented:"true"` - IndexTimeout time.Duration `toml:"index-timeout" json:"index-timeout" comment:"total timeout to fetch series list from index"` - TaggedTable string `toml:"tagged-table" json:"tagged-table" comment:"'tagged' table from carbon-clickhouse, required for seriesByTag"` - TaggedAutocompleDays int `toml:"tagged-autocomplete-days" json:"tagged-autocomplete-days" comment:"or how long the daemon will query tags during autocomplete"` - TaggedUseDaily bool `toml:"tagged-use-daily" json:"tagged-use-daily" comment:"whether to use date filter when searching for the metrics in the tagged-table"` - TaggedCosts map[string]*Costs `toml:"tagged-costs" json:"tagged-costs" commented:"true" comment:"costs for tags (for tune which tag will be used as primary), by default is 0, increase for costly (with poor selectivity) tags"` - TreeTable string `toml:"tree-table" json:"tree-table" comment:"old index table, DEPRECATED, see description in doc/config.md" commented:"true"` - ReverseTreeTable string `toml:"reverse-tree-table" json:"reverse-tree-table" commented:"true"` - DateTreeTable string `toml:"date-tree-table" json:"date-tree-table" commented:"true"` - DateTreeTableVersion int `toml:"date-tree-table-version" json:"date-tree-table-version" commented:"true"` - TreeTimeout time.Duration `toml:"tree-timeout" json:"tree-timeout" commented:"true"` - TagTable string `toml:"tag-table" json:"tag-table" comment:"is not recommended to use, https://github.com/lomik/graphite-clickhouse/wiki/TagsRU" commented:"true"` - ExtraPrefix string `toml:"extra-prefix" json:"extra-prefix" comment:"add extra prefix (directory in graphite) for all metrics, w/o trailing dot"` - ConnectTimeout time.Duration `toml:"connect-timeout" json:"connect-timeout" comment:"TCP connection timeout"` + URL string `toml:"url" json:"url" comment:"default url, see https://clickhouse.tech/docs/en/interfaces/http. Can be overwritten with query-params"` + DataTimeout time.Duration `toml:"data-timeout" json:"data-timeout" comment:"default total timeout to fetch data, can be overwritten with query-params"` + RenderMaxQueries int `toml:"render-max-queries" json:"render-max-queries" comment:"Max queries to render queiries"` + RenderConcurrentQueries int `toml:"render-concurrent-queries" json:"render-concurrent-queries" comment:"Concurrent queries to render queiries"` + RenderAdaptiveQueries int `toml:"render-adaptive-queries" json:"render-adaptive-queries" comment:"Render adaptive queries (based on load average) for increase/decrease concurrent queries"` + QueryParams []QueryParam `toml:"query-params" json:"query-params" comment:"customized query params (url, data timeout, limiters) for durations greater or equal"` + FindMaxQueries int `toml:"find-max-queries" json:"find-max-queries" comment:"Max queries for find queries"` + FindConcurrentQueries int `toml:"find-concurrent-queries" json:"find-concurrent-queries" comment:"Find concurrent queries for find queries"` + FindAdaptiveQueries int `toml:"find-adaptive-queries" json:"find-adaptive-queries" comment:"Find adaptive queries (based on load average) for increase/decrease concurrent queries"` + FindLimiter limiter.ServerLimiter `toml:"-" json:"-"` + TagsMaxQueries int `toml:"tags-max-queries" json:"tags-max-queries" comment:"Max queries for tags queries"` + TagsConcurrentQueries int `toml:"tags-concurrent-queries" json:"tags-concurrent-queries" comment:"Concurrent queries for tags queries"` + TagsAdaptiveQueries int `toml:"tags-adaptive-queries" json:"tags-adaptive-queries" comment:"Tags adaptive queries (based on load average) for increase/decrease concurrent queries"` + TagsMinInQuery int `toml:"tags-min-in-query" json:"tags-min-in-query" comment:"Minimum tags in seriesByTag query"` + TagsMinInAutocomplete int `toml:"tags-min-in-autocomplete" json:"tags-min-in-autocomplete" comment:"Minimum tags in autocomplete query"` + TagsLimiter limiter.ServerLimiter `toml:"-" json:"-"` + UserLimits map[string]UserLimits `toml:"user-limits" json:"user-limits" comment:"customized query limiter for some users" commented:"true"` + DateFormat string `toml:"date-format" json:"date-format" comment:"Date format (default, utc, both)"` + IndexTable string `toml:"index-table" json:"index-table" comment:"see doc/index-table.md"` + IndexUseDaily bool `toml:"index-use-daily" json:"index-use-daily"` + IndexReverse string `toml:"index-reverse" json:"index-reverse" comment:"see doc/config.md"` + IndexReverses IndexReverses `toml:"index-reverses" json:"index-reverses" comment:"see doc/config.md" commented:"true"` + IndexTimeout time.Duration `toml:"index-timeout" json:"index-timeout" comment:"total timeout to fetch series list from index"` + TaggedTable string `toml:"tagged-table" json:"tagged-table" comment:"'tagged' table from carbon-clickhouse, required for seriesByTag"` + TaggedAutocompleDays int `toml:"tagged-autocomplete-days" json:"tagged-autocomplete-days" comment:"or how long the daemon will query tags during autocomplete"` + TaggedUseDaily bool `toml:"tagged-use-daily" json:"tagged-use-daily" comment:"whether to use date filter when searching for the metrics in the tagged-table"` + TaggedCosts map[string]*Costs `toml:"tagged-costs" json:"tagged-costs" commented:"true" comment:"costs for tags (for tune which tag will be used as primary), by default is 0, increase for costly (with poor selectivity) tags"` + TreeTable string `toml:"tree-table" json:"tree-table" comment:"old index table, DEPRECATED, see description in doc/config.md" commented:"true"` + ReverseTreeTable string `toml:"reverse-tree-table" json:"reverse-tree-table" commented:"true"` + DateTreeTable string `toml:"date-tree-table" json:"date-tree-table" commented:"true"` + DateTreeTableVersion int `toml:"date-tree-table-version" json:"date-tree-table-version" commented:"true"` + TreeTimeout time.Duration `toml:"tree-timeout" json:"tree-timeout" commented:"true"` + TagTable string `toml:"tag-table" json:"tag-table" comment:"is not recommended to use, https://github.com/lomik/graphite-clickhouse/wiki/TagsRU" commented:"true"` + ExtraPrefix string `toml:"extra-prefix" json:"extra-prefix" comment:"add extra prefix (directory in graphite) for all metrics, w/o trailing dot"` + ConnectTimeout time.Duration `toml:"connect-timeout" json:"connect-timeout" comment:"TCP connection timeout"` // TODO: remove in v0.14 DataTableLegacy string `toml:"data-table" json:"data-table" comment:"will be removed in 0.14" commented:"true"` // TODO: remove in v0.14 @@ -457,8 +462,8 @@ func Unmarshal(body []byte, noLog bool) (*Config, error) { cfg.Logging = make([]zapwriter.Config, 0) } - if cfg.ClickHouse.RenderMaxConcurrent > cfg.ClickHouse.RenderMaxQueries && cfg.ClickHouse.RenderMaxQueries > 0 { - cfg.ClickHouse.RenderMaxConcurrent = 0 + if cfg.ClickHouse.RenderConcurrentQueries > cfg.ClickHouse.RenderMaxQueries && cfg.ClickHouse.RenderMaxQueries > 0 { + cfg.ClickHouse.RenderConcurrentQueries = 0 } if err := clickhouseUrlValidate(cfg.ClickHouse.URL); err != nil { @@ -466,8 +471,8 @@ func Unmarshal(body []byte, noLog bool) (*Config, error) { } for i := range cfg.ClickHouse.QueryParams { - if cfg.ClickHouse.QueryParams[i].MaxConcurrent > cfg.ClickHouse.QueryParams[i].MaxQueries && cfg.ClickHouse.QueryParams[i].MaxQueries > 0 { - cfg.ClickHouse.QueryParams[i].MaxConcurrent = 0 + if cfg.ClickHouse.QueryParams[i].ConcurrentQueries > cfg.ClickHouse.QueryParams[i].MaxQueries && cfg.ClickHouse.QueryParams[i].MaxQueries > 0 { + cfg.ClickHouse.QueryParams[i].ConcurrentQueries = 0 } if cfg.ClickHouse.QueryParams[i].Duration == 0 { @@ -488,7 +493,8 @@ func Unmarshal(body []byte, noLog bool) (*Config, error) { cfg.ClickHouse.QueryParams = append( []QueryParam{{ URL: cfg.ClickHouse.URL, DataTimeout: cfg.ClickHouse.DataTimeout, - MaxQueries: cfg.ClickHouse.RenderMaxQueries, MaxConcurrent: cfg.ClickHouse.RenderMaxConcurrent, + MaxQueries: cfg.ClickHouse.RenderMaxQueries, ConcurrentQueries: cfg.ClickHouse.RenderConcurrentQueries, + AdaptiveQueries: cfg.ClickHouse.RenderAdaptiveQueries, }}, cfg.ClickHouse.QueryParams..., ) @@ -605,31 +611,62 @@ func Unmarshal(body []byte, noLog bool) (*Config, error) { } } - if cfg.ClickHouse.FindMaxConcurrent > cfg.ClickHouse.FindMaxQueries && cfg.ClickHouse.FindMaxQueries > 0 { - cfg.ClickHouse.FindMaxConcurrent = 0 + if cfg.ClickHouse.FindConcurrentQueries > cfg.ClickHouse.FindMaxQueries && cfg.ClickHouse.FindMaxQueries > 0 { + cfg.ClickHouse.FindConcurrentQueries = 0 } - if cfg.ClickHouse.TagsMaxConcurrent > cfg.ClickHouse.TagsMaxQueries && cfg.ClickHouse.TagsMaxQueries > 0 { - cfg.ClickHouse.TagsMaxConcurrent = 0 + if cfg.ClickHouse.TagsConcurrentQueries > cfg.ClickHouse.TagsMaxQueries && cfg.ClickHouse.TagsMaxQueries > 0 { + cfg.ClickHouse.TagsConcurrentQueries = 0 } metricsEnabled := cfg.setupGraphiteMetrics() - cfg.ClickHouse.FindLimiter = limiter.NewWLimiter(cfg.ClickHouse.FindMaxQueries, cfg.ClickHouse.FindMaxConcurrent, metricsEnabled, "find", "all") + cfg.ClickHouse.FindLimiter = limiter.NewALimiter( + cfg.ClickHouse.FindMaxQueries, cfg.ClickHouse.FindConcurrentQueries, cfg.ClickHouse.FindAdaptiveQueries, + metricsEnabled, "find", "all", + ) - cfg.ClickHouse.TagsLimiter = limiter.NewWLimiter(cfg.ClickHouse.TagsMaxQueries, cfg.ClickHouse.TagsMaxConcurrent, metricsEnabled, "tags", "all") + cfg.ClickHouse.TagsLimiter = limiter.NewALimiter( + cfg.ClickHouse.TagsMaxQueries, cfg.ClickHouse.TagsConcurrentQueries, cfg.ClickHouse.TagsAdaptiveQueries, + metricsEnabled, "tags", "all", + ) for i := range cfg.ClickHouse.QueryParams { - cfg.ClickHouse.QueryParams[i].Limiter = limiter.NewWLimiter(cfg.ClickHouse.QueryParams[i].MaxQueries, cfg.ClickHouse.QueryParams[i].MaxConcurrent, metricsEnabled, "render", duration.String(cfg.ClickHouse.QueryParams[i].Duration)) + cfg.ClickHouse.QueryParams[i].Limiter = limiter.NewALimiter( + cfg.ClickHouse.QueryParams[i].MaxQueries, cfg.ClickHouse.QueryParams[i].ConcurrentQueries, + cfg.ClickHouse.QueryParams[i].AdaptiveQueries, + metricsEnabled, "render", duration.String(cfg.ClickHouse.QueryParams[i].Duration), + ) } for u, q := range cfg.ClickHouse.UserLimits { - q.Limiter = limiter.NewWLimiter(q.MaxQueries, q.MaxConcurrent, metricsEnabled, u, "all") + q.Limiter = limiter.NewALimiter( + q.MaxQueries, q.ConcurrentQueries, q.AdaptiveQueries, metricsEnabled, u, "all", + ) cfg.ClickHouse.UserLimits[u] = q } return cfg, nil } +// NeedLoadAvgColect check if load avg collect is neeeded +func (c *Config) NeedLoadAvgColect() bool { + if c.ClickHouse.RenderAdaptiveQueries > 0 { + return true + } + if c.ClickHouse.FindAdaptiveQueries > 0 { + return true + } + if c.ClickHouse.TagsAdaptiveQueries > 0 { + return true + } + for _, u := range c.ClickHouse.UserLimits { + if u.AdaptiveQueries > 0 { + return true + } + } + return false +} + // ProcessDataTables checks if legacy `data`-table config is used, compiles regexps for `target-match-any` and `target-match-all` // parameters, sets the rollup configuration and proper context. func (c *Config) ProcessDataTables() (err error) { diff --git a/config/config_test.go b/config/config_test.go index 298bbf54a..dd5296c36 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -471,11 +471,11 @@ tree-timeout = "5s" connect-timeout = "2s" render-max-queries = 1000 -render-max-concurrent = 10 +render-concurrent-queries = 10 find-max-queries = 200 -find-max-concurrent = 8 +find-concurrent-queries = 8 tags-max-queries = 50 -tags-max-concurrent = 4 +tags-concurrent-queries = 4 query-params = [ { @@ -487,7 +487,7 @@ query-params = [ user-limits = { "alert" = { max-queries = 200, - max-concurrent = 10 + concurrent-queries = 10 } } @@ -613,11 +613,11 @@ sample-thereafter = 12 DataTimeout: 64000000000, QueryParams: []QueryParam{ { - Duration: 0, - URL: "http://somehost:8123", - DataTimeout: 64000000000, - MaxQueries: 1000, - MaxConcurrent: 10, + Duration: 0, + URL: "http://somehost:8123", + DataTimeout: 64000000000, + MaxQueries: 1000, + ConcurrentQueries: 10, }, { Duration: 72 * time.Hour, @@ -626,16 +626,16 @@ sample-thereafter = 12 Limiter: limiter.NoopLimiter{}, }, }, - RenderMaxQueries: 1000, - RenderMaxConcurrent: 10, - FindMaxQueries: 200, - FindMaxConcurrent: 8, - TagsMaxQueries: 50, - TagsMaxConcurrent: 4, + RenderMaxQueries: 1000, + RenderConcurrentQueries: 10, + FindMaxQueries: 200, + FindConcurrentQueries: 8, + TagsMaxQueries: 50, + TagsConcurrentQueries: 4, UserLimits: map[string]UserLimits{ "alert": { - MaxQueries: 200, - MaxConcurrent: 10, + MaxQueries: 200, + ConcurrentQueries: 10, }, }, IndexTable: "graphite_index", @@ -661,18 +661,333 @@ sample-thereafter = 12 r, _ = regexp.Compile("^reg$") expected.ClickHouse.IndexReverses[1] = &IndexReverseRule{"", "", "^reg$", r, "reversed"} for i := range config.ClickHouse.QueryParams { - if _, ok := config.ClickHouse.QueryParams[i].Limiter.(*limiter.WLimiter); ok && config.ClickHouse.QueryParams[i].MaxQueries > 0 && config.ClickHouse.QueryParams[i].MaxConcurrent > 0 { + if _, ok := config.ClickHouse.QueryParams[i].Limiter.(*limiter.WLimiter); ok && config.ClickHouse.QueryParams[i].MaxQueries > 0 && config.ClickHouse.QueryParams[i].ConcurrentQueries > 0 { config.ClickHouse.QueryParams[i].Limiter = nil } } - if _, ok := config.ClickHouse.FindLimiter.(*limiter.WLimiter); ok && config.ClickHouse.FindMaxQueries > 0 && config.ClickHouse.FindMaxConcurrent > 0 { + if _, ok := config.ClickHouse.FindLimiter.(*limiter.WLimiter); ok && config.ClickHouse.FindMaxQueries > 0 && config.ClickHouse.FindConcurrentQueries > 0 { config.ClickHouse.FindLimiter = nil } - if _, ok := config.ClickHouse.TagsLimiter.(*limiter.WLimiter); ok && config.ClickHouse.TagsMaxQueries > 0 && config.ClickHouse.TagsMaxConcurrent > 0 { + if _, ok := config.ClickHouse.TagsLimiter.(*limiter.WLimiter); ok && config.ClickHouse.TagsMaxQueries > 0 && config.ClickHouse.TagsConcurrentQueries > 0 { config.ClickHouse.TagsLimiter = nil } for u, q := range config.ClickHouse.UserLimits { - if _, ok := q.Limiter.(*limiter.WLimiter); ok && q.MaxQueries > 0 && q.MaxConcurrent > 0 { + if _, ok := q.Limiter.(*limiter.WLimiter); ok && q.MaxQueries > 0 && q.ConcurrentQueries > 0 { + q.Limiter = nil + config.ClickHouse.UserLimits[u] = q + } + } + + assert.Equal(t, expected.ClickHouse, config.ClickHouse) + + // Tags + expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output"} + assert.Equal(t, expected.Tags, config.Tags) + + // Carbonlink + expected.Carbonlink = Carbonlink{"server:3333", 5, 2, 250000000, 350000000, 800000000} + assert.Equal(t, expected.Carbonlink, config.Carbonlink) + + // Prometheus + expected.Prometheus = Prometheus{":9092", "https://server:3456/uri", nil, "Prometheus Time Series", 5 * time.Minute} + u, _ := url.Parse(expected.Prometheus.ExternalURLRaw) + expected.Prometheus.ExternalURL = u + assert.Equal(t, expected.Prometheus, config.Prometheus) + + // Debug + expected.Debug = Debug{"tests_tmp", os.FileMode(0755), os.FileMode(0640)} + assert.Equal(t, expected.Debug, config.Debug) + assert.DirExists(t, "tests_tmp") + + // Logger + expected.Logging = make([]zapwriter.Config, 2) + expected.Logging[0] = zapwriter.Config{ + Logger: "debugger", + File: "stdout", + Level: "debug", + Encoding: "console", + EncodingTime: "iso8601", + EncodingDuration: "string", + SampleTick: "5ms", + SampleInitial: 1, + SampleThereafter: 2, + } + expected.Logging[1] = zapwriter.Config{ + Logger: "logger", + File: "tests_tmp/logger.txt", + Level: "info", + Encoding: "json", + EncodingTime: "epoch", + EncodingDuration: "seconds", + SampleTick: "50ms", + SampleInitial: 10, + SampleThereafter: 12, + } + assert.Equal(t, expected.Logging, config.Logging) + + metrics.FindRequestMetric = nil + metrics.TagsRequestMetric = nil + metrics.RenderRequestMetric = nil + metrics.UnregisterAll() +} + +func TestReadConfigGraphiteWithALimiter(t *testing.T) { + body := []byte( + `[common] +listen = "[::1]:9090" +pprof-listen = "127.0.0.1:9091" +max-cpu = 15 +max-metrics-in-find-answer = 13 +max-metrics-per-target = 16 +target-blacklist = ['^blacklisted'] +memory-return-interval = "12s150ms" + +[metrics] +metric-endpoint = "127.0.0.1:2003" +metric-interval = "10s" +metric-prefix = "graphite" +ranges = { "1h" = "1h", "3d" = "72h", "7d" = "168h", "30d" = "720h", "90d" = "2160h" } + +[clickhouse] +url = "http://somehost:8123" +index-table = "graphite_index" +index-use-daily = false +index-reverse = "direct" +index-reverses = [ + {suffix = "suf", prefix = "pref", reverse = "direct"}, + {regex = "^reg$", reverse = "reversed"}, +] +tagged-table = "graphite_tags" +tagged-autocomplete-days = 5 +tagged-use-daily = false +tree-table = "tree" +reverse-tree-table = "reversed_tree" +date-tree-table = "data_tree" +date-tree-table-version = 2 +tag-table = "tag_table" +extra-prefix = "tum.pu-dum" +data-table = "data" +rollup-conf = "none" +max-data-points = 8000 +internal-aggregation = true +data-timeout = "64s" +index-timeout = "4s" +tree-timeout = "5s" +connect-timeout = "2s" + +render-max-queries = 1000 +render-concurrent-queries = 10 +render-adaptive-queries = 4 +find-max-queries = 200 +find-concurrent-queries = 8 +tags-max-queries = 50 +tags-concurrent-queries = 4 +tags-adaptive-queries = 3 + +query-params = [ + { + duration = "72h", + url = "http://localhost:8123/?max_rows_to_read=20000", + concurrent-queries = 4, + adaptive-queries = 6 + } +] + +user-limits = { + "alert" = { + max-queries = 200, + concurrent-queries = 10, + adaptive-queries = 5 + } +} + +# DataTable is tested in TestProcessDataTables +# [[data-table]] +# table = "another_data" +# rollup-conf = "auto" +# rollup-conf-table = "another_table" + +[tags] +rules = "filename" +date = "2012-12-12" +extra-where = "AND case" +input-file = "input" +output-file = "output" + +[carbonlink] +server = "server:3333" +threads-per-request = 5 +connect-timeout = "250ms" +query-timeout = "350ms" +total-timeout = "800ms" + +[prometheus] +listen = ":9092" +external-url = "https://server:3456/uri" +page-title = "Prometheus Time Series" +lookback-delta = "5m" + +[debug] +directory = "tests_tmp" +directory-perm = 0o755 +external-data-perm = 0o640 + +[[logging]] +logger = "debugger" +file = "stdout" +level = "debug" +encoding = "console" +encoding-time = "iso8601" +encoding-duration = "string" +sample-tick = "5ms" +sample-initial = 1 +sample-thereafter = 2 + +[[logging]] +logger = "logger" +file = "tests_tmp/logger.txt" +level = "info" +encoding = "json" +encoding-time = "epoch" +encoding-duration = "seconds" +sample-tick = "50ms" +sample-initial = 10 +sample-thereafter = 12 +`, + ) + config, err := Unmarshal(body, false) + expected := New() + require.NoError(t, err) + assert.NotNil(t, metrics.Graphite) + metrics.Graphite = nil + + // Common + expected.Common = Common{ + Listen: "[::1]:9090", + PprofListen: "127.0.0.1:9091", + MaxCPU: 15, + MaxMetricsInFindAnswer: 13, + MaxMetricsPerTarget: 16, + TargetBlacklist: []string{"^blacklisted"}, + Blacklist: make([]*regexp.Regexp, 1), + MemoryReturnInterval: 12150000000, + FindCacheConfig: CacheConfig{ + Type: "null", + DefaultTimeoutSec: 0, + ShortTimeoutSec: 0, + }, + } + expected.Metrics = metrics.Config{ + MetricEndpoint: "127.0.0.1:2003", + MetricInterval: 10 * time.Second, + MetricTimeout: time.Second, + MetricPrefix: "graphite", + BucketsWidth: []int64{200, 500, 1000, 2000, 3000, 5000, 7000, 10000, 15000, 20000, 25000, 30000, 40000, 50000, 60000}, + BucketsLabels: []string{ + "_to_200ms", + "_to_500ms", + "_to_1000ms", + "_to_2000ms", + "_to_3000ms", + "_to_5000ms", + "_to_7000ms", + "_to_10000ms", + "_to_15000ms", + "_to_20000ms", + "_to_25000ms", + "_to_30000ms", + "_to_40000ms", + "_to_50000ms", + "_to_60000ms", + "_to_inf", + }, + // until-from = { "1h" = "1h", "3d" = "72h", "7d" = "168h", "30d" = "720h", "90d" = "2160h" } + Ranges: map[string]time.Duration{ + "1h": time.Hour, + "3d": 72 * time.Hour, + "7d": 168 * time.Hour, + "30d": 720 * time.Hour, + "90d": 2160 * time.Hour, + }, + RangeNames: []string{"1h", "3d", "7d", "30d", "90d", "history"}, + RangeS: []int64{3600, 259200, 604800, 2592000, 7776000, math.MaxInt64}, + } + r, _ := regexp.Compile(expected.Common.TargetBlacklist[0]) + expected.Common.Blacklist[0] = r + assert.Equal(t, expected.Common, config.Common) + assert.Equal(t, expected.Metrics, config.Metrics) + + // ClickHouse + expected.ClickHouse = ClickHouse{ + URL: "http://somehost:8123", + DataTimeout: 64000000000, + QueryParams: []QueryParam{ + { + Duration: 0, + URL: "http://somehost:8123", + DataTimeout: 64000000000, + MaxQueries: 1000, + ConcurrentQueries: 10, + AdaptiveQueries: 4, + }, + { + Duration: 72 * time.Hour, + URL: "http://localhost:8123/?max_rows_to_read=20000", + DataTimeout: 64000000000, + ConcurrentQueries: 4, + AdaptiveQueries: 6, + }, + }, + RenderMaxQueries: 1000, + RenderConcurrentQueries: 10, + RenderAdaptiveQueries: 4, + FindMaxQueries: 200, + FindConcurrentQueries: 8, + TagsMaxQueries: 50, + TagsConcurrentQueries: 4, + TagsAdaptiveQueries: 3, + UserLimits: map[string]UserLimits{ + "alert": { + MaxQueries: 200, + ConcurrentQueries: 10, + AdaptiveQueries: 5, + }, + }, + IndexTable: "graphite_index", + IndexReverse: "direct", + IndexReverses: make(IndexReverses, 2), + IndexTimeout: 4000000000, + TaggedTable: "graphite_tags", + TaggedAutocompleDays: 5, + TreeTable: "tree", + ReverseTreeTable: "reversed_tree", + DateTreeTable: "data_tree", + DateTreeTableVersion: 2, + TreeTimeout: 5000000000, + TagTable: "tag_table", + ExtraPrefix: "tum.pu-dum", + ConnectTimeout: 2000000000, + DataTableLegacy: "data", + RollupConfLegacy: "none", + MaxDataPoints: 8000, + InternalAggregation: true, + } + expected.ClickHouse.IndexReverses[0] = &IndexReverseRule{"suf", "pref", "", nil, "direct"} + r, _ = regexp.Compile("^reg$") + expected.ClickHouse.IndexReverses[1] = &IndexReverseRule{"", "", "^reg$", r, "reversed"} + for i := range config.ClickHouse.QueryParams { + if _, ok := config.ClickHouse.QueryParams[i].Limiter.(*limiter.ALimiter); ok { + config.ClickHouse.QueryParams[i].Limiter = nil + } + } + if _, ok := config.ClickHouse.FindLimiter.(*limiter.WLimiter); ok { + config.ClickHouse.FindLimiter = nil + } + if _, ok := config.ClickHouse.TagsLimiter.(*limiter.ALimiter); ok { + config.ClickHouse.TagsLimiter = nil + } + for u, q := range config.ClickHouse.UserLimits { + if _, ok := q.Limiter.(*limiter.ALimiter); ok { q.Limiter = nil config.ClickHouse.UserLimits[u] = q } diff --git a/deploy/doc/config.md b/deploy/doc/config.md index 2811d2118..8c0f6a744 100644 --- a/deploy/doc/config.md +++ b/deploy/doc/config.md @@ -71,15 +71,17 @@ query-params = [ ### Query limiter for prevent database overloading (limit concurrent/maximum incomming requests) For prevent database overloading incomming requests (render/find/autocomplete) can be limited. -If executing max-concurrent requests, next request will be wait for free slot until index-timeout reached If wait max-queries requests, for new request error returned immediately. - +If executing concurrent-queries requests, next request will be wait for free slot until index-timeout reached +adaptive-queries prevent overload with load average check if graphite-clickhouse run on one host with clickhouse +Real queries will be concurrent-queries + adaptive-queries * (1 / normalized_load_avg - 1). +If normalized_load_avg > 0.9, limit will be concurrent-queries. ``` url = "http://graphite:qwerty@localhost:8123/?readonly=2&log_queries=1&max_rows_to_read=102400000&max_result_bytes=12800000&max_threads=2" render-max-queries = 500 render-max-concurrent = 10 find-max-queries = 100 -find-max-concurrent = 10 +find-concurrent-queries = 10 tags-max-queries = 100 tags-max-concurrent = 10 diff --git a/doc/config.md b/doc/config.md index 29a054ffc..e783ab110 100644 --- a/doc/config.md +++ b/doc/config.md @@ -74,15 +74,17 @@ query-params = [ ### Query limiter for prevent database overloading (limit concurrent/maximum incomming requests) For prevent database overloading incomming requests (render/find/autocomplete) can be limited. -If executing max-concurrent requests, next request will be wait for free slot until index-timeout reached If wait max-queries requests, for new request error returned immediately. - +If executing concurrent-queries requests, next request will be wait for free slot until index-timeout reached +adaptive-queries prevent overload with load average check if graphite-clickhouse run on one host with clickhouse +Real queries will be concurrent-queries + adaptive-queries * (1 / normalized_load_avg - 1). +If normalized_load_avg > 0.9, limit will be concurrent-queries. ``` url = "http://graphite:qwerty@localhost:8123/?readonly=2&log_queries=1&max_rows_to_read=102400000&max_result_bytes=12800000&max_threads=2" render-max-queries = 500 render-max-concurrent = 10 find-max-queries = 100 -find-max-concurrent = 10 +find-concurrent-queries = 10 tags-max-queries = 100 tags-max-concurrent = 10 @@ -239,16 +241,22 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str data-timeout = "1m0s" # Max queries to render queiries render-max-queries = 0 - # Maximum concurrent queries to render queiries - render-max-concurrent = 0 + # Concurrent queries to render queiries + render-concurrent-queries = 0 + # Render adaptive queries (based on load average) for increase/decrease concurrent queries + render-adaptive-queries = 0 # Max queries for find queries find-max-queries = 0 - # Maximum concurrent queries for find queries - find-max-concurrent = 0 + # Find concurrent queries for find queries + find-concurrent-queries = 0 + # Find adaptive queries (based on load average) for increase/decrease concurrent queries + find-adaptive-queries = 0 # Max queries for tags queries tags-max-queries = 0 - # Maximum concurrent queries for tags queries - tags-max-concurrent = 0 + # Concurrent queries for tags queries + tags-concurrent-queries = 0 + # Tags adaptive queries (based on load average) for increase/decrease concurrent queries + tags-adaptive-queries = 0 # Minimum tags in seriesByTag query tags-min-in-query = 0 # Minimum tags in autocomplete query diff --git a/doc/config.md.orig b/doc/config.md.orig new file mode 100644 index 000000000..29a054ffc --- /dev/null +++ b/doc/config.md.orig @@ -0,0 +1,399 @@ +[//]: # (This file is built out of deploy/doc/config.md, please do not edit it manually) +[//]: # (To rebuild it run `make config`) + +# Configuration + +## Common `[common]` + +### Finder cache + +Specify what storage to use for finder cache. This cache stores finder results (metrics find/tags autocomplete/render). + +Supported cache types: + - `mem` - will use integrated in-memory cache. Not distributed. Fast. + - `memcache` - will use specified memcache servers. Could be shared. Slow. + - `null` - disable cache + +Extra options: + - `size_mb` - specify max size of cache, in MiB + - `defaultTimeoutSec` - specify default cache ttl. + - `shortTimeoutSec` - cache ttl for short duration intervals of render queries (duration <= shortDuration && now-until <= 61) (if 0, disable this cache) + - `findTimeoutSec` - cache ttl for finder/tags autocompleter queries (if 0, disable this cache) + - `shortDuration` - maximum duration for render queries, which use shortTimeoutSec duration + +### Example +```yaml +[common.find-cache] +type = "memcache" +size_mb = 0 +memcachedServers = [ "127.0.0.1:1234", "127.0.0.2:1235" ] +defaultTimeoutSec = 10800 +shortTimeoutSec = 300 +findTimeoutSec = 600 +``` + +## ClickHouse `[clickhouse]` + +### URL `url` +Detailed explanation of ClickHouse HTTP interface is given in [documentation](https://clickhouse.tech/docs/en/interfaces/http). It's recommended to create a dedicated read-only user for graphite-clickhouse. + +Example: `url = "http://graphite:qwerty@localhost:8123/?readonly=2&log_queries=1"` + +Some useful parameters: + +- [log_queries=1](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-log-queries): all queries will be logged in the `system.query_log` table. Useful for debug. +- [readonly=2](https://clickhouse.tech/docs/en/operations/settings/permissions-for-queries/#settings_readonly): do not change data on the server +- [max_rows_to_read=200000000](https://clickhouse.tech/docs/en/operations/settings/query-complexity/#max-rows-to-read): useful if you want to prevent too broad requests +- [cancel_http_readonly_queries_on_client_close=1](https://clickhouse.tech/docs/en/operations/settings/settings/#cancel-http-readonly-queries-on-client-close): cancel DB query when request is canceled. + +All these and more settings can be set in clickhouse-server configuration as user's profile settings. + +Useless settings: + +- `max_query_size`: at the moment [external data](https://clickhouse.tech/docs/en/engines/table-engines/special/external-data/) is used, the query length is relatively small and always less than the default [262144](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_query_size) +- `max_ast_elements`: the same +- `max_execution_time`: with `cancel_http_readonly_queries_on_client_close=1` and `data-timeout = "1m"` it's already covered. + +### Query multi parameters (for overwrite default url and data-timeout) + +For queries with duration (until - from) >= 72 hours, use custom url and data-timeout + +``` +url = "http://graphite:qwerty@localhost:8123/?readonly=2&log_queries=1&max_rows_to_read=102400000&max_result_bytes=12800000&max_threads=2" +data-timeout = "30s" + +query-params = [ + { + duration = "72h", + url = "http://graphite:qwerty@localhost:8123/?readonly=2&log_queries=1&max_rows_to_read=1024000000&max_result_bytes=128000000&max_threads=1", + data-timeout = "60s" + } +] +``` + +### Query limiter for prevent database overloading (limit concurrent/maximum incomming requests) + +For prevent database overloading incomming requests (render/find/autocomplete) can be limited. +If executing max-concurrent requests, next request will be wait for free slot until index-timeout reached +If wait max-queries requests, for new request error returned immediately. + +``` +url = "http://graphite:qwerty@localhost:8123/?readonly=2&log_queries=1&max_rows_to_read=102400000&max_result_bytes=12800000&max_threads=2" +render-max-queries = 500 +render-max-concurrent = 10 +find-max-queries = 100 +find-max-concurrent = 10 +tags-max-queries = 100 +tags-max-concurrent = 10 + +query-params = [ + { + duration = "72h", + url = "http://graphite:qwerty@localhost:8123/?readonly=2&log_queries=1&max_rows_to_read=1024000000&max_result_bytes=128000000&max_threads=1", + data-timeout = "60s" + max-queries = 100, + max-concurrent = 4 + } +] + +user-limits = { + "alerting" = { + max-queries = 100, + max-concurrent = 5 + } +} + +``` + +### Index table +See [index table](./index-table.md) documentation for details. + +### Index reversed queries tuning +By default the daemon decides to make a direct or reversed request to the [index table](./index-table.md) based on a first and last glob node in the metric. It choose the most long path to reduce readings. Additional examples can be found in [tests](../finder/index_test.go). + +You can overwrite automatic behavior with `index-reverse`. Valid values are `"auto", direct, "reversed"` + +If you need fine tuning for different paths, you can use `[[clickhouse.index-reverses]]` to set behavior per metrics' `prefix`, `suffix` or `regexp`. + +### Tags table +By default, tags are stored in the tagged-table on the daily basis. If a metric set doesn't change much, that leads to situation when the same data stored multiple times. +To prevent uncontrolled growth and reduce the amount of data stored in the tagged-table, the `tagged-use-daily` parameter could be set to `false` and table definition could be changed to something like: +``` +CREATE TABLE graphite_tagged ( + Date Date, + Tag1 String, + Path String, + Tags Array(String), + Version UInt32 +) ENGINE = ReplacingMergeTree(Date) +ORDER BY (Tag1, Path); +``` + +For restrict costly seriesByTag (may be like `seriesByTag('name=~test.*.*.rabbitmq_overview.connections')` or `seriesByTag('name=test.*.*.rabbitmq_overview.connections')`) use tags-min-in-query parameter. +For restrict costly autocomplete queries use tags-min-in-autocomplete parameter. + +set for require at minimum 1 eq argument (without wildcards) +`tags-min-in-query=1` + + +`ReplacingMergeTree(Date)` prevent broken tags autocomplete with default `ReplacingMergeTree(Version)`, when write to the past. + +### ClickHouse aggregation +For detailed description of `max-data-points` and `internal-aggregation` see [aggregation documentation](./aggregation.md). + +## Data tables `[[data-table]]` + +### Rollup +The rollup configuration is used for a proper metrics pre-aggregation. It contains two rules types: + +- retention for point per time range +- aggregation function for a values + +Historically, the way to define the config was `rollup-conf = "/path/to/the/conf/with/graphite_rollup.xml"`. The format is the same as [graphite_rollup](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/graphitemergetree/#rollup-configuration) scheme for ClickHouse server. + +For a quite long time it's recommended to use `rollup-conf = "auto"` to get the configuration from remote ClickHouse server. It will update itself on each `rollup-auto-interval` (1 minute by default) or once on startup if set to "0s". + +If you don't use a `GraphiteMergeTree` family engine, you can still use `rollup-conf = "auto"` by setting `rollup-auto-table="graphiteMergeTreeTable"` and get the proper config. In this case `graphiteMergeTreeTable` is a dummy table associated with proper [graphite_rollup](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/graphitemergetree/#rollup-configuration). The cases when you may need it: + +- ReplacingMergeTree engine +- Distributed engine +- Materialized view + +It's possible as well to set `rollup-conf = "none"`. Then values from `rollup-default-precision` and `rollup-default-function` will be used. + +#### Additional rollup tuning for reversed data tables +When `reverse = true` is set for data-table, there are two possibles cases for [graphite_rollup](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/graphitemergetree/#rollup-configuration): + +- Original regexps are used, like `^level_one.level_two.suffix$` +- Reversed regexps are used, like `^suffix.level_two.level_one$` + +Depends on it for having a proper retention and aggregation you must additionally set `rollup-use-reverted = true` for the first case and `rollup-use-reverted = false` for the second. + +#### Additional tuning tagged find for seriesByTag and autocomplete +Only one tag used as filter for index field Tag1, see graphite_tagged table [structure](https://github.com/lomik/ +```toml +[common] + # general listener + listen = ":9090" + # listener to serve /debug/pprof requests. '-pprof' argument overrides it + pprof-listen = "" + max-cpu = 1 + # limit number of results from find query, 0=unlimited + max-metrics-in-find-answer = 0 + # limit numbers of queried metrics per target in /render requests, 0 or negative = unlimited + max-metrics-per-target = 15000 + # daemon returns empty response if query matches any of regular expressions + # target-blacklist = [] + # daemon will return the freed memory to the OS when it>0 + memory-return-interval = "0s" + # additional request headers to log + headers-to-log = [] + + # find/tags cache config + [common.find-cache] + # cache type + type = "null" + # cache size + size-mb = 0 + # memcached servers + memcached-servers = [] + # default cache ttl + default-timeout = 0 + # short-time cache ttl + short-timeout = 0 + # finder/tags autocompleter cache ttl + find-timeout = 0 + # maximum diration, used with short_timeout + short-duration = "0s" + # offset beetween now and until for select short cache timeout + short-offset = 0 + +[metrics] + # graphite relay address + metric-endpoint = "" + # statsd server address + statsd-endpoint = "" + # graphite metrics send interval + metric-interval = "0s" + # graphite metrics send timeout + metric-timeout = "0s" + # graphite metrics prefix + metric-prefix = "" + # Request historgram buckets widths + request-buckets = [] + # Request historgram buckets labels + request-labels = [] + + # Additional separate stats for until-from ranges + [metrics.ranges] + + # Additional separate stats for until-from find ranges + [metrics.find-ranges] + # Extended metrics + extended-stat = false + +[clickhouse] + # default url, see https://clickhouse.tech/docs/en/interfaces/http. Can be overwritten with query-params + url = "http://localhost:8123?cancel_http_readonly_queries_on_client_close=1" + # default total timeout to fetch data, can be overwritten with query-params + data-timeout = "1m0s" + # Max queries to render queiries + render-max-queries = 0 + # Maximum concurrent queries to render queiries + render-max-concurrent = 0 + # Max queries for find queries + find-max-queries = 0 + # Maximum concurrent queries for find queries + find-max-concurrent = 0 + # Max queries for tags queries + tags-max-queries = 0 + # Maximum concurrent queries for tags queries + tags-max-concurrent = 0 + # Minimum tags in seriesByTag query + tags-min-in-query = 0 + # Minimum tags in autocomplete query + tags-min-in-autocomplete = 0 + + # customized query limiter for some users + # [clickhouse.user-limits] + # Date format (default, utc, both) + date-format = "" + # see doc/index-table.md + index-table = "graphite_index" + index-use-daily = true + # see doc/config.md + index-reverse = "auto" + + # [[clickhouse.index-reverses]] + # rule is used when the target suffix is matched + # suffix = "suffix" + # same as index-reverse + # reverse = "auto" + + # [[clickhouse.index-reverses]] + # rule is used when the target prefix is matched + # prefix = "prefix" + # same as index-reverse + # reverse = "direct" + + # [[clickhouse.index-reverses]] + # rule is used when the target regex is matched + # regex = "regex" + # same as index-reverse + # reverse = "reversed" + # total timeout to fetch series list from index + index-timeout = "1m0s" + # 'tagged' table from carbon-clickhouse, required for seriesByTag + tagged-table = "graphite_tagged" + # or how long the daemon will query tags during autocomplete + tagged-autocomplete-days = 7 + # whether to use date filter when searching for the metrics in the tagged-table + tagged-use-daily = true + + # costs for tags (for tune which tag will be used as primary), by default is 0, increase for costly (with poor selectivity) tags + # [clickhouse.tagged-costs] + # old index table, DEPRECATED, see description in doc/config.md + # tree-table = "" + # reverse-tree-table = "" + # date-tree-table = "" + # date-tree-table-version = 0 + # tree-timeout = "0s" + # is not recommended to use, https://github.com/lomik/graphite-clickhouse/wiki/TagsRU + # tag-table = "" + # add extra prefix (directory in graphite) for all metrics, w/o trailing dot + extra-prefix = "" + # TCP connection timeout + connect-timeout = "1s" + # will be removed in 0.14 + # data-table = "" + # rollup-conf = "auto" + # max points per metric when internal-aggregation=true + max-data-points = 1048576 + # ClickHouse-side aggregation, see doc/aggregation.md + internal-aggregation = true + +[[data-table]] + # data table from carbon-clickhouse + table = "graphite_data" + # if it stores direct or reversed metrics + reverse = false + # maximum age stored in the table + max-age = "0s" + # minimum age stored in the table + min-age = "0s" + # maximum until-from interval allowed for the table + max-interval = "0s" + # minimum until-from interval allowed for the table + min-interval = "0s" + # table allowed only if any metrics in target matches regexp + target-match-any = "" + # table allowed only if all metrics in target matches regexp + target-match-all = "" + # custom rollup.xml file for table, 'auto' and 'none' are allowed as well + rollup-conf = "auto" + # custom table for 'rollup-conf=auto', useful for Distributed or MatView + rollup-auto-table = "" + # rollup update interval for 'rollup-conf=auto' + rollup-auto-interval = "1m0s" + # is used when none of rules match + rollup-default-precision = 0 + # is used when none of rules match + rollup-default-function = "" + # should be set to true if you don't have reverted regexps in rollup-conf for reversed tables + rollup-use-reverted = false + # valid values are 'graphite' of 'prometheus' + context = [] + +# is not recommended to use, https://github.com/lomik/graphite-clickhouse/wiki/TagsRU +# [tags] + # rules = "" + # date = "" + # extra-where = "" + # input-file = "" + # output-file = "" + +[carbonlink] + server = "" + threads-per-request = 10 + connect-timeout = "50ms" + query-timeout = "50ms" + # timeout for querying and parsing response + total-timeout = "500ms" + +[prometheus] + # listen addr for prometheus ui and api + listen = ":9092" + # allows to set URL for redirect manually + external-url = "" + page-title = "Prometheus Time Series Collection and Processing Server" + lookback-delta = "5m0s" + +# see doc/debugging.md +[debug] + # the directory for additional debug output + directory = "" + # permissions for directory, octal value is set as 0o755 + directory-perm = 493 + # permissions for directory, octal value is set as 0o640 + external-data-perm = 0 + +[[logging]] + # handler name, default empty + logger = "" + # '/path/to/filename', 'stderr', 'stdout', 'empty' (=='stderr'), 'none' + file = "/var/log/graphite-clickhouse/graphite-clickhouse.log" + # 'debug', 'info', 'warn', 'error', 'dpanic', 'panic', and 'fatal' + level = "info" + # 'json' or 'console' + encoding = "mixed" + # 'millis', 'nanos', 'epoch', 'iso8601' + encoding-time = "iso8601" + # 'seconds', 'nanos', 'string' + encoding-duration = "seconds" + # passed to time.ParseDuration + sample-tick = "" + # first n messages logged per tick + sample-initial = 0 + # every m-th message logged thereafter per tick + sample-thereafter = 0 +``` diff --git a/graphite-clickhouse.go b/graphite-clickhouse.go index e9b4ef552..cba8e9d34 100644 --- a/graphite-clickhouse.go +++ b/graphite-clickhouse.go @@ -21,6 +21,7 @@ import ( "github.com/lomik/graphite-clickhouse/find" "github.com/lomik/graphite-clickhouse/healthcheck" "github.com/lomik/graphite-clickhouse/index" + "github.com/lomik/graphite-clickhouse/load_avg" "github.com/lomik/graphite-clickhouse/logs" "github.com/lomik/graphite-clickhouse/metrics" "github.com/lomik/graphite-clickhouse/pkg/scope" @@ -213,5 +214,17 @@ func main() { metrics.Graphite.Start(nil) } + if cfg.NeedLoadAvgColect() { + go func() { + for { + load1, err := load_avg.Normalized() + if err == nil { + load_avg.Store(load1) + } + time.Sleep(time.Second * 10) + } + }() + } + log.Fatal(http.ListenAndServe(cfg.Common.Listen, mux)) } diff --git a/limiter/alimiter.go b/limiter/alimiter.go new file mode 100644 index 000000000..19b5a5c9e --- /dev/null +++ b/limiter/alimiter.go @@ -0,0 +1,162 @@ +package limiter + +import ( + "context" + "time" + + "github.com/lomik/graphite-clickhouse/load_avg" + "github.com/lomik/graphite-clickhouse/metrics" +) + +var ( + ctxMain, Stop = context.WithCancel(context.Background()) + checkDelay = time.Second * 60 +) + +func getWeighted(n, max int) int { + if n <= 0 { + return 0 + } + loadAvg := load_avg.Load() + if loadAvg < 1 { + return 0 + } + + l := int(float64(n) * loadAvg) + if l >= max { + if max <= 1 { + return 1 + } + return max - 1 + } + + return l +} + +// ALimiter provide limiter amount of requests/concurrently executing requests (adaptive with load avg) +type ALimiter struct { + l limiter + cL limiter + c int + n int + + m metrics.WaitMetric +} + +// NewServerLimiter creates a limiter for specific servers list. +func NewALimiter(l, c, n int, enableMetrics bool, scope, sub string) ServerLimiter { + if l <= 0 && c <= 0 { + return NoopLimiter{} + } + if n >= c { + n = c - 1 + } + if n <= 0 { + return NewWLimiter(l, c, enableMetrics, scope, sub) + } + + a := &ALimiter{ + m: metrics.NewWaitMetric(enableMetrics, scope, sub), c: c, n: n, + } + a.cL.ch = make(chan struct{}, c) + a.cL.cap = c + + go a.balance() + + return a +} + +func (sl *ALimiter) balance() int { + var last int + for { + start := time.Now() + n := getWeighted(sl.n, sl.c) + if n > last { + for i := 0; i < n-last; i++ { + if sl.cL.enter(ctxMain, "balance") != nil { + break + } + } + last = n + } else if n < last { + for i := 0; i < last-n; i++ { + sl.cL.leave(ctxMain, "balance") + } + last = n + } + delay := time.Since(start) + if delay < checkDelay { + time.Sleep(checkDelay - delay) + } + } +} + +func (sl *ALimiter) Capacity() int { + return sl.l.capacity() +} + +func (sl *ALimiter) Enter(ctx context.Context, s string) (err error) { + if sl.l.cap > 0 { + if err = sl.l.tryEnter(ctx, s); err != nil { + sl.m.WaitErrors.Add(1) + return + } + } + if sl.cL.cap > 0 { + if sl.cL.enter(ctx, s) != nil { + if sl.l.cap > 0 { + sl.l.leave(ctx, s) + } + sl.m.WaitErrors.Add(1) + err = ErrTimeout + } + } + sl.m.Requests.Add(1) + return +} + +// TryEnter claims one of free slots without blocking. +func (sl *ALimiter) TryEnter(ctx context.Context, s string) (err error) { + if sl.l.cap > 0 { + if err = sl.l.tryEnter(ctx, s); err != nil { + sl.m.WaitErrors.Add(1) + return + } + } + if sl.cL.cap > 0 { + if sl.cL.tryEnter(ctx, s) != nil { + if sl.l.cap > 0 { + sl.l.leave(ctx, s) + } + sl.m.WaitErrors.Add(1) + err = ErrTimeout + } + } + sl.m.Requests.Add(1) + return +} + +// Frees a slot in limiter +func (sl *ALimiter) Leave(ctx context.Context, s string) { + if sl.l.cap > 0 { + sl.l.leave(ctx, s) + } + sl.cL.leave(ctx, s) +} + +// SendDuration send StatsD duration iming +func (sl *ALimiter) SendDuration(queueMs int64) { + if sl.m.WaitTimeName != "" { + metrics.Gstatsd.Timing(sl.m.WaitTimeName, queueMs, 1.0) + } +} + +// Unregiter unregister graphite metric +func (sl *ALimiter) Unregiter() { + sl.m.Unregister() +} + +// Enabled return enabled flag, if false - it's a noop limiter and can be safely skiped +func (sl *ALimiter) Enabled() bool { + return true +} diff --git a/limiter/alimiter_test.go b/limiter/alimiter_test.go new file mode 100644 index 000000000..01c22e569 --- /dev/null +++ b/limiter/alimiter_test.go @@ -0,0 +1,183 @@ +package limiter + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "github.com/lomik/graphite-clickhouse/load_avg" + "github.com/stretchr/testify/require" +) + +func Test_getWeighted(t *testing.T) { + tests := []struct { + loadAvg float64 + c int + n int + want int + }{ + {loadAvg: 0, c: 100, n: 100, want: 0}, + {loadAvg: 0.2, c: 100, n: 100, want: 0}, + {loadAvg: 0.999, c: 100, n: 1, want: 0}, + {loadAvg: 1, c: 1, n: 100, want: 1}, + {loadAvg: 1, c: 100, n: 100, want: 99}, + {loadAvg: 1, c: 101, n: 100, want: 100}, + {loadAvg: 1, c: 200, n: 100, want: 100}, + {loadAvg: 2, c: 100, n: 200, want: 99}, + {loadAvg: 2, c: 200, n: 200, want: 199}, + {loadAvg: 2, c: 300, n: 200, want: 299}, + {loadAvg: 2, c: 400, n: 200, want: 399}, + {loadAvg: 2, c: 401, n: 200, want: 400}, + {loadAvg: 2, c: 402, n: 200, want: 400}, + } + for n, tt := range tests { + t.Run(strconv.Itoa(n), func(t *testing.T) { + load_avg.Store(tt.loadAvg) + if got := getWeighted(tt.n, tt.c); got != tt.want { + t.Errorf("load avg = %f getWeighted(%d) = %v, want %v", tt.loadAvg, tt.n, got, tt.want) + } + }) + } +} + +func TestNewALimiter(t *testing.T) { + l := 14 + c := 12 + n := 10 + checkDelay = time.Millisecond * 10 + limiter := NewALimiter(l, c, n, false, "", "") + + // inital - load not collected + load_avg.Store(0) + + var i int + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + + for i = 0; i < c; i++ { + require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 0 [%d]", i) + } + + require.Error(t, limiter.Enter(ctx, "render")) + + for i = 0; i < c; i++ { + limiter.Leave(ctx, "render") + } + + cancel() + + // load_avg 0.5 + load_avg.Store(0.5) + k := getWeighted(n, c) + require.Equal(t, n/2, k) + + time.Sleep(checkDelay * 2) + + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100) + for i = 0; i < c-k; i++ { + require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 0.5 [%d]", i) + } + + require.Error(t, limiter.Enter(ctx, "render")) + + for i = 0; i < c-k; i++ { + limiter.Leave(ctx, "render") + } + + cancel() + + // // load_avg 1 + load_avg.Store(1) + k = getWeighted(n, c) + require.Equal(t, n, k) + + time.Sleep(checkDelay * 2) + + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) + for i = 0; i < c-k; i++ { + require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 1 [%d]", i) + } + + require.Error(t, limiter.Enter(ctx, "render")) + + for i = 0; i < k; i++ { + limiter.Leave(ctx, "render") + } + + cancel() +} + +type testLimiter struct { + l int + c int + n int + concurrencyLevel int +} + +func Benchmark_Limiter_Parallel(b *testing.B) { + tests := []testLimiter{ + // WLimiter + {l: 2000, c: 10, concurrencyLevel: 1}, + {l: 2000, c: 10, concurrencyLevel: 10}, + {l: 2000, c: 10, concurrencyLevel: 20}, + {l: 2000, c: 10, concurrencyLevel: 50}, + {l: 2000, c: 10, concurrencyLevel: 100}, + {l: 2000, c: 10, concurrencyLevel: 1000}, + // ALimiter + {l: 2000, c: 10, n: 50, concurrencyLevel: 1}, + {l: 2000, c: 10, n: 50, concurrencyLevel: 10}, + {l: 2000, c: 10, n: 50, concurrencyLevel: 20}, + {l: 2000, c: 10, n: 50, concurrencyLevel: 50}, + {l: 2000, c: 10, n: 50, concurrencyLevel: 100}, + {l: 2000, c: 10, n: 50, concurrencyLevel: 1000}, + } + + load_avg.Store(0.5) + for _, tt := range tests { + + b.Run(fmt.Sprintf("L%d_C%d_N%d_CONCURRENCY%d", tt.l, tt.c, tt.n, tt.concurrencyLevel), func(b *testing.B) { + var ( + err error + ) + + limiter := NewALimiter(tt.l, tt.c, tt.n, false, "", "") + + wgStart := sync.WaitGroup{} + wg := sync.WaitGroup{} + wgStart.Add(tt.concurrencyLevel) + + ctx := context.Background() + + b.ResetTimer() + for i := 0; i < tt.concurrencyLevel; i++ { + wg.Add(1) + go func() { + wgStart.Done() + wgStart.Wait() + // Test routine + for n := 0; n < b.N; n++ { + errW := limiter.Enter(ctx, "render") + if errW == nil { + limiter.Leave(ctx, "render") + } else { + err = errW + break + } + } + // End test routine + wg.Done() + }() + + } + + wg.Wait() + b.StopTimer() + + if err != nil { + b.Fatal(b, err) + } + }) + } +} diff --git a/limiter/wlimiter.go b/limiter/wlimiter.go index 98ecd43be..fa49cc9af 100644 --- a/limiter/wlimiter.go +++ b/limiter/wlimiter.go @@ -6,7 +6,7 @@ import ( "github.com/lomik/graphite-clickhouse/metrics" ) -// WLimiter provides interface to limit amount of requests/concurrently executing requests +// WLimiter provide limiter amount of requests/concurrently executing requests type WLimiter struct { l limiter cL limiter @@ -18,6 +18,9 @@ func NewWLimiter(l, c int, enableMetrics bool, scope, sub string) ServerLimiter if l <= 0 && c <= 0 { return NoopLimiter{} } + if c <= 0 { + return NewLimiter(l, enableMetrics, scope, sub) + } w := &WLimiter{ m: metrics.NewWaitMetric(enableMetrics, scope, sub), @@ -46,7 +49,9 @@ func (sl *WLimiter) Enter(ctx context.Context, s string) (err error) { } if sl.cL.cap > 0 { if sl.cL.enter(ctx, s) != nil { - sl.l.leave(ctx, s) + if sl.l.cap > 0 { + sl.l.leave(ctx, s) + } sl.m.WaitErrors.Add(1) err = ErrTimeout } @@ -65,7 +70,9 @@ func (sl *WLimiter) TryEnter(ctx context.Context, s string) (err error) { } if sl.cL.cap > 0 { if sl.cL.tryEnter(ctx, s) != nil { - sl.l.leave(ctx, s) + if sl.l.cap > 0 { + sl.l.leave(ctx, s) + } sl.m.WaitErrors.Add(1) err = ErrTimeout } @@ -76,7 +83,9 @@ func (sl *WLimiter) TryEnter(ctx context.Context, s string) (err error) { // Frees a slot in limiter func (sl *WLimiter) Leave(ctx context.Context, s string) { - sl.l.leave(ctx, s) + if sl.l.cap > 0 { + sl.l.leave(ctx, s) + } sl.cL.leave(ctx, s) } diff --git a/load_avg/load_avg.go b/load_avg/load_avg.go new file mode 100644 index 000000000..64d95008a --- /dev/null +++ b/load_avg/load_avg.go @@ -0,0 +1,51 @@ +package load_avg + +import ( + "os" + "strings" + "syscall" + + "github.com/msaf1980/go-stringutils" + "github.com/msaf1980/go-syncutils/atomic" +) + +var ( + loadAvgStore atomic.Float64 +) + +func Normalized() (float64, error) { + var info syscall.Sysinfo_t + err := syscall.Sysinfo(&info) + if err != nil { + return 0, err + } + + cpus, err := CpuCount() + if err != nil { + return 0, err + } + + const si_load_shift = 16 + load1 := float64(info.Loads[0]) / float64(1<