-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
123 lines (102 loc) · 2.92 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
addr = flag.String("listen-address", ":8080", "The address to listen on for HTTP requests.")
druidUri = flag.String("druid-uri", "http://BROKER:8082/druid/v2/sql/", "The URI to reach Druid's router or broker SQL API.")
)
type Task struct {
Type string
Runner_Status string
Total int
}
type DruidTasksExporter struct {
Tasks *prometheus.Desc
}
func NewDruidTasksExporter() *DruidTasksExporter {
return &DruidTasksExporter{
Tasks: prometheus.NewDesc(
"dte_druid_tasks_total",
"Total number of Druid tasks per type and status.",
[]string{"type", "runner_status"},
prometheus.Labels{},
)}
}
func (d *DruidTasksExporter) RetrieveMetrics() []Task {
query, _ := json.Marshal(map[string]string{
"query": "SELECT type,runner_status,count(*) AS total FROM sys.tasks GROUP BY type,runner_status",
})
reqBody := bytes.NewBuffer(query)
resp, err := http.Post(*druidUri, "application/json", reqBody)
if err != nil {
log.Fatalf("An Error occured while making the request: %v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatalf("An Error occured while reading the response: %v", err)
}
var tasks []Task
err = json.Unmarshal(body, &tasks)
if err != nil {
log.Fatalf("An Error occured while unmarshalling %s: %v", body, err)
}
return tasks
}
func (c *DruidTasksExporter) Describe(ch chan<- *prometheus.Desc) {
ch <- c.Tasks
}
func (d *DruidTasksExporter) Collect(ch chan<- prometheus.Metric) {
tasks := d.RetrieveMetrics()
Runner_Statuses := []string{"NONE", "PENDING", "RUNNING", "WAITING"}
taskTypes := []string{"single_phase_sub_task", "index", "index_parallel", "kill", "compact"}
for _, status := range Runner_Statuses {
for _, taskType := range taskTypes {
is_present := false
for _, task := range tasks {
if task.Type == taskType && task.Runner_Status == status {
is_present = true
}
}
if !is_present {
tasks = append(tasks, Task{Type: taskType, Runner_Status: status, Total: 0})
}
}
}
for _, task := range tasks {
ch <- prometheus.MustNewConstMetric(
d.Tasks,
prometheus.GaugeValue,
float64(task.Total),
task.Type,
task.Runner_Status,
)
}
}
func ok(w http.ResponseWriter, _ *http.Request) {
_, err := io.WriteString(w, "ok")
if err != nil {
http.Error(w, "Internal server error", http.StatusInternalServerError)
fmt.Println("Error writing response:", err)
return
}
}
func main() {
flag.Parse()
druid := NewDruidTasksExporter()
reg := prometheus.NewPedanticRegistry()
reg.MustRegister(druid)
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
http.HandleFunc("/", ok)
log.Printf("The server is listening on %s", *addr)
log.Fatal(http.ListenAndServe(*addr, nil))
}