-
Notifications
You must be signed in to change notification settings - Fork 9
/
kafka_jolokia_reporter.py
175 lines (135 loc) · 6.32 KB
/
kafka_jolokia_reporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from __future__ import print_function
import argparse
import httplib
import json
import re
# VERSION 0.2
# FROM https://github.com/paksu/kafka-jolokia-telegraf-translator
# Ignore any metric that contains these strings because we don't want to
# collect aggregated data
ignored_metrics = [
'Percentile', 'Mean', 'Rate'
]
def tokenize_metric_path(path):
"""
Tokenizes a metric path for InfluxDB
The path is split into 3 parts: metric_name, tag and value_prefix
Example:
path = "kafka.server:delayedOperation=Fetch,name=NumDelayedOperations,type=DelayedOperationPurgatory"
results in:
- metric_name = kafka.server
- tags = {'delayedOperation': 'Fetch', 'name': 'NumDelayedOperations'}
- value_prefix = DelayedOperationPurgatory
"""
metric_name, metric_path = path.split(":")
tokens = {token.split("=")[0]: token.split("=")[1] for token in metric_path.split(',')}
# the 'type' field from the metric path as value_prefix, rest of the tokens in the metric path are tags
value_prefix = tokens.pop('type')
return metric_name, value_prefix, tokens
def get_metrics(value_prefix, values):
"""
Gets a list a jolokia response valueset into InfluxDB value format. Any non-numeric values will be skipped.
Example:
value_prefix = "foo"
values = { "Value":1233.2 }
is translated to: ["foo=233.2"]
value_prefix = "foo"
values = { "Count":0, "RateUnit":"SECONDS", "MeanRate":4.0 }
is translated to: ["foo.Count=0", "foo.MeanRate=4.0"]
"""
# Strip non-numeric values from the valueset
values = {k: v for k, v in values.iteritems() if isinstance(v, (int, long, float, complex))}
if 'Value' in values:
# If the valueset contains only a single Value then use that
return [(value_prefix, values['Value'])]
else:
# If the valueset contains multiple values then append the metric names to value_prefix
# and combine them with the actual numeric value
return [("{}.{}".format(value_prefix, value_suffix), value) for value_suffix, value in values.iteritems()]
def translate_values(metric, values):
"""
Translates the given metric and valueset to an array of InfluxDB line protocol metrics
metric = "kafka.server:delayedOperation=Fetch,name=NumDelayedOperations,type=DelayedOperationPurgatory"
values = { "Value":1233.2 }
results to:
[
kafka.server,delayedOperation=Fetch,name=NumDelayedOperations DelayedOperationPurgatory=1233.2
]
metric = "kafka.server:delayedOperation=Fetch,name=NumDelayedOperations,type=DelayedOperationPurgatory"
values = { "Count":1233.2, "SomeMetric":123 }
results to:
[
kafka.server,delayedOperation=Fetch,name=NumDelayedOperations DelayedOperationPurgatory.Count=1233.2
kafka.server,delayedOperation=Fetch,name=NumDelayedOperations DelayedOperationPurgatory.SomeMetric=123
]
"""
metric_name, value_prefix, tags = tokenize_metric_path(metric)
tag_string = ','.join(['{}={}'.format(k, v) for k, v in tags.iteritems()])
tag_string = ',' + tag_string if tag_string else ''
ignore_regex = None
if ignored_metrics:
ignore_regex = re.compile('|'.join(ignored_metrics))
metrics = []
for value_name, value in get_metrics(value_prefix, values):
if not ignore_regex or not ignore_regex.search(value_name):
metrics.append("{}{} {}={}".format(metric_name, tag_string, value_name, value))
return metrics
def fetch_jmx_from_jolokia(host, port, jolokia_context, metric):
"""
Fetches the given metric string from jolokia
"""
conn = httplib.HTTPConnection(host, port)
# append trailing slash
if jolokia_context[-1] != "/":
jolokia_context = jolokia_context + "/"
path = "{}/read/{}".format(jolokia_context, metric)
conn.request("GET", path)
response = conn.getresponse()
assert response.status == 200
return response.read()
def translate_response(response, ignored_metrics=[]):
"""
Parses a Kafka JMX metrics response from jolokia and converts it to set of InfluxDB Line protocol
Currently supports at least Kafka 0.9 and Influxdb 0.13
https://jolokia.org
https://docs.influxdata.com/influxdb/v0.13/write_protocols/line/
http://kafka.apache.org/090/documentation.html#monitoring
"""
response = json.loads(response)
if 'value' not in response:
return []
# Check if the response is a flat data structure containing values for only a single metric
value_types = [type(value) for value in response['value'].values()]
if dict not in value_types:
# No nested data found in the values so we are dealing with a single metric
metric = response['request']['mbean']
return translate_values(metric, response['value'])
else:
# This is a multi-value response containing multiple metrics
metrics = []
for metric, value in response['value'].iteritems():
metrics = metrics + translate_values(metric, value)
return metrics
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--jolokia-host', default='localhost', help='Jolokia host')
parser.add_argument('--jolokia-port', type=int, default=8778, help='Jolokia port')
parser.add_argument('--jolokia-context', default='/jolokia', help='Jolokia context')
args = parser.parse_args()
# Collect these metrics
metrics = [
'kafka.server:*',
'kafka.controller:*',
'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce',
'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer',
'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower',
'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce',
'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer',
'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower',
'kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent',
'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs',
]
for metric in metrics:
response = fetch_jmx_from_jolokia(args.jolokia_host, args.jolokia_port, args.jolokia_context, metric)
for line in translate_response(response):
print(line)