Skip to content

Commit

Permalink
Add system metrics (#313)
Browse files Browse the repository at this point in the history
* [wip] add cpu metrics

* [sqlite] add metrics

* Fix after rebase

* [sqlite] small fixes

* add groups

* [metadata] migration from blob to text

* add groups

* small fixes

* add prefix check

* Fix after rebase

* fix: user request error

* [wip] add collector

* [wip] add collector

* add psi metrics

* [wip] add builtin modified metrics

* fix system metrics builtin

* add dc to system metrics

* add disk usage

* small fixes

* small fixes

* system metrics: sync send time

* system metrics: add duration scrape metric

---------

Co-authored-by: e.martyn <[email protected]>
  • Loading branch information
nevgeny and e.martyn authored Apr 24, 2023
1 parent a9606a0 commit f5e9393
Show file tree
Hide file tree
Showing 15 changed files with 1,623 additions and 46 deletions.
25 changes: 25 additions & 0 deletions cmd/collector_test/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import (
"log"
"os"
"time"

"github.com/vkcom/statshouse/internal/stats"
)

func main() {
host, err := os.Hostname()
if err != nil {
panic(err)
}
collector, err := stats.NewCollectorManager(stats.CollectorManagerOptions{ScrapeInterval: time.Second, HostName: host}, nil, log.New(os.Stderr, "[collector]", 0))
if err != nil {
log.Panic(err)
}
defer collector.StopCollector()
err = collector.RunCollector()
if err != nil {
panic(err)
}
}
17 changes: 11 additions & 6 deletions cmd/statshouse/argv.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"log"
"os"
"time"

"github.com/vkcom/statshouse/internal/vkgo/build"
"github.com/vkcom/statshouse/internal/vkgo/rpc"
Expand Down Expand Up @@ -43,12 +44,13 @@ var (

cluster string // common for agent and ingress proxy

configAgent agent.Config
maxCores int
listenAddr string
coresUDP int
bufferSizeUDP int
promRemoteMod bool
configAgent agent.Config
maxCores int
listenAddr string
coresUDP int
bufferSizeUDP int
promRemoteMod bool
hardwareMetricScrapeInterval time.Duration

configAggregator aggregator.ConfigAggregator

Expand Down Expand Up @@ -148,6 +150,9 @@ func argvAddAgentFlags(legacyVerb bool) {
flag.IntVar(&argv.maxCores, "cores", -1, "CPU cores usage limit. 0 all available, <0 use (cores-udp*3/2 + 1)")

flag.BoolVar(&argv.promRemoteMod, "prometheus-push-remote", false, "use remote pusher for prom metrics")

flag.DurationVar(&argv.hardwareMetricScrapeInterval, "hardware-metric-scrape-interval", time.Second, "how often hardware metrics will be scraped")

}

func argvAddAggregatorFlags(legacyVerb bool) {
Expand Down
13 changes: 13 additions & 0 deletions cmd/statshouse/statshouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"syscall"
"time"

"github.com/vkcom/statshouse/internal/stats"
"github.com/vkcom/statshouse/internal/vkgo/build"
"github.com/vkcom/statshouse/internal/vkgo/rpc"
"github.com/vkcom/statshouse/internal/vkgo/srvfunc"
Expand Down Expand Up @@ -352,6 +353,18 @@ func mainAgent(aesPwd string, dc *pcache.DiskCache) int {
}
}()

m, err := stats.NewCollectorManager(stats.CollectorManagerOptions{ScrapeInterval: argv.hardwareMetricScrapeInterval, HostName: argv.customHostName}, w, logErr)
if err != nil {
logErr.Println("failed to init hardware collector", err.Error())
} else {
go func() {
err := m.RunCollector()
if err != nil {
logErr.Println("failed to run hardware collector", err.Error())
}
}()
defer m.StopCollector()
}
chSignal := make(chan os.Signal, 1)
signal.Notify(chSignal, syscall.SIGINT, sigLogRotate)

Expand Down
106 changes: 71 additions & 35 deletions internal/format/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,29 +82,32 @@ const (
BuiltinMetricIDAPISelectDuration = -68
BuiltinMetricIDAgentHistoricQueueSizeSum = -69
BuiltinMetricIDAPISourceSelectRows = -70
BuiltinMetricIDSystemMetricScrapeDuration = -71
// [-1000..-1200] reversed by host system metrics

// metric names used in code directly
BuiltinMetricNameAggBucketReceiveDelaySec = "__agg_bucket_receive_delay_sec"
BuiltinMetricNameAgentSamplingFactor = "__src_sampling_factor"
BuiltinMetricNameAggSamplingFactor = "__agg_sampling_factor"
BuiltinMetricNameIngestionStatus = "__src_ingestion_status"
BuiltinMetricNameAggMappingCreated = "__agg_mapping_created"
BuiltinMetricNameBadges = "__badges"
BuiltinMetricNamePromScrapeTime = "__prom_scrape_time"
BuiltinMetricNameAPIRPCServiceTime = "__api_rpc_service_time"
BuiltinMetricNameUsageMemory = "__usage_mem"
BuiltinMetricNameUsageCPU = "__usage_cpu"
BuiltinMetricNameAPIBRS = "__api_big_response_storage_size"
BuiltinMetricNameAPISelectBytes = "__api_ch_select_bytes"
BuiltinMetricNameAPISelectRows = "__api_ch_select_rows"
BuiltinMetricNameAPISourceSelectRows = "__api_ch_source_select_rows"
BuiltinMetricNameAPISelectDuration = "__api_ch_select_duration"
BuiltinMetricNameAPIEndpointResponseTime = "__api_endpoint_response_time"
BuiltinMetricNameAPIEndpointServiceTime = "__api_endpoint_service_time"
BuiltinMetricNameBudgetHost = "__budget_host"
BuiltinMetricNameBudgetAggregatorHost = "__budget_aggregator_host"
BuiltinMetricNameAPIActiveQueries = "__api_active_queries"
BuiltinMetricNameBudgetUnknownMetric = "__budget_unknown_metric"
BuiltinMetricNameAggBucketReceiveDelaySec = "__agg_bucket_receive_delay_sec"
BuiltinMetricNameAgentSamplingFactor = "__src_sampling_factor"
BuiltinMetricNameAggSamplingFactor = "__agg_sampling_factor"
BuiltinMetricNameIngestionStatus = "__src_ingestion_status"
BuiltinMetricNameAggMappingCreated = "__agg_mapping_created"
BuiltinMetricNameBadges = "__badges"
BuiltinMetricNamePromScrapeTime = "__prom_scrape_time"
BuiltinMetricNameAPIRPCServiceTime = "__api_rpc_service_time"
BuiltinMetricNameUsageMemory = "__usage_mem"
BuiltinMetricNameUsageCPU = "__usage_cpu"
BuiltinMetricNameAPIBRS = "__api_big_response_storage_size"
BuiltinMetricNameAPISelectBytes = "__api_ch_select_bytes"
BuiltinMetricNameAPISelectRows = "__api_ch_select_rows"
BuiltinMetricNameAPISourceSelectRows = "__api_ch_source_select_rows"
BuiltinMetricNameAPISelectDuration = "__api_ch_select_duration"
BuiltinMetricNameAPIEndpointResponseTime = "__api_endpoint_response_time"
BuiltinMetricNameAPIEndpointServiceTime = "__api_endpoint_service_time"
BuiltinMetricNameBudgetHost = "__budget_host"
BuiltinMetricNameBudgetAggregatorHost = "__budget_aggregator_host"
BuiltinMetricNameAPIActiveQueries = "__api_active_queries"
BuiltinMetricNameBudgetUnknownMetric = "__budget_unknown_metric"
BuiltinMetricNameSystemMetricScrapeDuration = "__system_metrics_duration"

TagValueIDBadgeIngestionErrorsOld = -11 // remove from API, then stop writing
TagValueIDBadgeAggMappingErrorsOld = -33 // remove from API, then stop writing
Expand Down Expand Up @@ -266,6 +269,12 @@ const (
TagValueIDBuildArch386 = 2
TagValueIDBuildArchARM64 = 3
TagValueIDBuildArchARM = 4

TagValueIDSystemMetricCPU = 1
TagValueIDSystemMetricDisk = 2
TagValueIDSystemMetricMemory = 3
TagValueIDSystemMetricNet = 4
TagValueIDSystemMetricPSI = 5
)

var (
Expand Down Expand Up @@ -1307,6 +1316,21 @@ To see which seconds change when, use __contributors_log_rev`,
}),
}},
},
BuiltinMetricIDSystemMetricScrapeDuration: {
Name: BuiltinMetricNameSystemMetricScrapeDuration,
Kind: MetricKindValue,
Description: "System metrics scrape duration in seconds",
Tags: []MetricMetaTag{{
Description: "collector",
ValueComments: convertToValueComments(map[int32]string{
TagValueIDSystemMetricCPU: "cpu",
TagValueIDSystemMetricDisk: "disk",
TagValueIDSystemMetricMemory: "memory",
TagValueIDSystemMetricNet: "net",
TagValueIDSystemMetricPSI: "psi",
}),
}},
},
}

builtinMetricsInvisible = map[int32]bool{
Expand All @@ -1317,21 +1341,22 @@ To see which seconds change when, use __contributors_log_rev`,

// API and metadata sends this metrics via local statshouse instance
builtinMetricsAllowedToReceive = map[int32]bool{
BuiltinMetricIDTimingErrors: true,
BuiltinMetricIDPromScrapeTime: true,
BuiltinMetricIDAPIRPCServiceTime: true,
BuiltinMetricIDAPIBRS: true,
BuiltinMetricIDAPIEndpointResponseTime: true,
BuiltinMetricIDAPIEndpointServiceTime: true,
BuiltinMetricIDUsageMemory: true,
BuiltinMetricIDUsageCPU: true,
BuiltinMetricIDAPIActiveQueries: true,
BuiltinMetricIDAPISelectRows: true,
BuiltinMetricIDAPISelectBytes: true,
BuiltinMetricIDAPISelectDuration: true,
BuiltinMetricIDTimingErrors: true,
BuiltinMetricIDPromScrapeTime: true,
BuiltinMetricIDAPIRPCServiceTime: true,
BuiltinMetricIDAPIBRS: true,
BuiltinMetricIDAPIEndpointResponseTime: true,
BuiltinMetricIDAPIEndpointServiceTime: true,
BuiltinMetricIDUsageMemory: true,
BuiltinMetricIDUsageCPU: true,
BuiltinMetricIDAPIActiveQueries: true,
BuiltinMetricIDAPISelectRows: true,
BuiltinMetricIDAPISelectBytes: true,
BuiltinMetricIDAPISelectDuration: true,
BuiltinMetricIDSystemMetricScrapeDuration: true,
}

metricsWithAgentEnvRouteArch = map[int32]bool{
MetricsWithAgentEnvRouteArch = map[int32]bool{
BuiltinMetricIDAgentDiskCacheErrors: true,
BuiltinMetricIDTimingErrors: true,
BuiltinMetricIDAgentMapping: true,
Expand Down Expand Up @@ -1381,6 +1406,7 @@ To see which seconds change when, use __contributors_log_rev`,
BuiltinMetricIDAPIActiveQueries: true,
BuiltinMetricIDBudgetHost: true,
BuiltinMetricIDBudgetAggregatorHost: true,
BuiltinMetricIDSystemMetricScrapeDuration: true,
}

BuiltinMetricByName map[string]*MetricMetaValue
Expand Down Expand Up @@ -1478,6 +1504,13 @@ func createBuiltinMetricIDHeartbeatArgs(name string, description string) *Metric
}

func init() {
for k, v := range hostMetrics {
v.Tags = append([]MetricMetaTag{{Name: "hostname"}}, v.Tags...)
v.Resolution = 60
BuiltinMetrics[k] = v
builtinMetricsAllowedToReceive[k] = true
metricsWithoutAggregatorID[k] = true
}
for i := 0; i < MaxTags; i++ {
legacyName := tagIDPrefix + strconv.Itoa(i)
tagIDs = append(tagIDs, legacyName)
Expand Down Expand Up @@ -1516,7 +1549,10 @@ func init() {
m.Tags[AggShardTag] = MetricMetaTag{Description: "aggregator_shard", Raw: true}
m.Tags[AggReplicaTag] = MetricMetaTag{Description: "aggregator_replica", Raw: true}
}
if metricsWithAgentEnvRouteArch[id] {
if _, ok := hostMetrics[id]; ok {
m.Tags[HostDCTag] = MetricMetaTag{Description: "dc"}
}
if MetricsWithAgentEnvRouteArch[id] {
m.Tags[RouteTag] = MetricMetaTag{Description: "route", ValueComments: convertToValueComments(routeToValue)}
m.Tags[AgentEnvTag] = MetricMetaTag{
Description: "statshouse_env",
Expand Down
11 changes: 11 additions & 0 deletions internal/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,17 @@ func (m *MetricMetaValue) UnmarshalBinary(data []byte) error {
return nil
}

func MetricJSON(value *MetricMetaValue) ([]byte, error) {
if err := value.RestoreCachedInfo(); err != nil {
return nil, err
}
metricBytes, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("failed to serialize metric: %w", err)
}
return metricBytes, nil
}

func (m DashboardMeta) MarshalBinary() ([]byte, error) { return json.Marshal(m) }
func (m *DashboardMeta) UnmarshalBinary(data []byte) error {
if err := json.Unmarshal(data, m); err != nil {
Expand Down
Loading

0 comments on commit f5e9393

Please sign in to comment.