Skip to content

Commit

Permalink
[INLONG-1908]Adjust the metric realization of TubeMQ (#1909)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosonzhang authored Dec 6, 2021
1 parent cee052c commit 1a905c6
Show file tree
Hide file tree
Showing 31 changed files with 1,435 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ public abstract class AbsMetricItem {
protected final String name;
protected final AtomicLong value = new AtomicLong(0);

public AbsMetricItem(MetricType metricType, String name) {
this(metricType, MetricValueType.NORMAL, name, 0);
}

public AbsMetricItem(MetricType metricType, MetricValueType valueType,
String name, long initialValue) {
this.metricType = metricType;
Expand Down Expand Up @@ -67,6 +63,10 @@ public long incrementAndGet() {
return value.incrementAndGet();
}

public boolean compareAndSet(long expect, long update) {
return value.compareAndSet(expect, update);
}

public long decrementAndGet() {
return value.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class CountMetricItem extends AbsMetricItem {

public CountMetricItem(String name) {
super(MetricType.COUNTER, name);
super(MetricType.COUNTER, MetricValueType.MAX, name, 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class GaugeNormMetricItem extends AbsMetricItem {

public GaugeNormMetricItem(String name) {
super(MetricType.GAUGE, MetricValueType.MIN, name, 0);
super(MetricType.GAUGE, MetricValueType.NORMAL, name, 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.inlong.tubemq.corebase.metric;

import java.beans.ConstructorProperties;

public class MetricValue {
private final String type;
private final String name;
private final long value;

@ConstructorProperties({"type", "name", "value"})
public MetricValue(String type, String name, long value) {
this.name = name;
this.type = type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.inlong.tubemq.corebase.metric;

public enum MetricValueType {
NORMAL(0, "Normal"),
MIN(1, "Min"),
MAX(2, "Max");
NORMAL(0, "Normal", "Current value"),
MIN(1, "Min", "Historical minimum value"),
MAX(2, "Max", "Historical maximum value");

MetricValueType(int id, String name) {
MetricValueType(int id, String name, String desc) {
this.id = id;
this.name = name;
this.desc = desc;
}

public int getId() {
Expand All @@ -35,6 +36,10 @@ public String getName() {
return name;
}

public String getDesc() {
return desc;
}

public static MetricValueType valueOf(int value) {
for (MetricValueType valueType : MetricValueType.values()) {
if (valueType.getId() == value) {
Expand All @@ -46,4 +51,5 @@ public static MetricValueType valueOf(int value) {

private final int id;
private final String name;
private final String desc;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.inlong.tubemq.corebase.metric;

import java.beans.ConstructorProperties;
import java.util.Map;

public class MetricValues {
private final String lastResetTime;
private final Map<String, Long> metricValues;

@ConstructorProperties({"lastResetTime", "metricValues"})
public MetricValues(String lastResetTime, Map<String, Long> metricValues) {
this.lastResetTime = lastResetTime;
this.metricValues = metricValues;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.tubemq.corebase.metric;

import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricItemTest {
private static final Logger logger =
LoggerFactory.getLogger(MetricItemTest.class);

@Test
public void testMetricItem() {
try {
final CountMetricItem countMetricItem =
new CountMetricItem("CountMetricItem");
final GaugeNormMetricItem gaugeNormMetricItem =
new GaugeNormMetricItem("GaugeNormMetricItem");
final GaugeMaxMetricItem gaugeMaxMetricItem =
new GaugeMaxMetricItem("GaugeMaxMetricItem");
final GaugeMinMetricItem gaugeMinMetricItem =
new GaugeMinMetricItem("GaugeMinMetricItem");

countMetricItem.incrementAndGet();
countMetricItem.incrementAndGet();
countMetricItem.incrementAndGet();
countMetricItem.decrementAndGet();

gaugeNormMetricItem.update(1000);
gaugeNormMetricItem.update(2000);
gaugeNormMetricItem.update(500);

gaugeMaxMetricItem.update(1000);
gaugeMaxMetricItem.update(5000);
gaugeMaxMetricItem.update(3000);

gaugeMinMetricItem.update(1000);
gaugeMinMetricItem.update(1);
gaugeMinMetricItem.update(10000);

Assert.assertEquals(2, countMetricItem.getValue());
Assert.assertEquals(500, gaugeNormMetricItem.getValue());
Assert.assertEquals(5000, gaugeMaxMetricItem.getValue());
Assert.assertEquals(1, gaugeMinMetricItem.getValue());

countMetricItem.getAndSet();
gaugeNormMetricItem.getAndSet();
gaugeMaxMetricItem.getAndSet();
gaugeMinMetricItem.getAndSet();

Assert.assertEquals(0, countMetricItem.getValue());
Assert.assertEquals(500, gaugeNormMetricItem.getValue());
Assert.assertEquals(0, gaugeMaxMetricItem.getValue());
Assert.assertEquals(Long.MAX_VALUE, gaugeMinMetricItem.getValue());

Assert.assertEquals(MetricType.COUNTER.getId(),
countMetricItem.getMetricType().getId());
Assert.assertEquals(MetricValueType.MAX.getId(),
countMetricItem.getMetricValueType().getId());
Assert.assertEquals(MetricType.GAUGE.getId(),
gaugeNormMetricItem.getMetricType().getId());
Assert.assertEquals(MetricValueType.NORMAL.getId(),
gaugeNormMetricItem.getMetricValueType().getId());
Assert.assertEquals(MetricType.GAUGE.getId(),
gaugeMaxMetricItem.getMetricType().getId());
Assert.assertEquals(MetricValueType.MAX.getId(),
gaugeMaxMetricItem.getMetricValueType().getId());
Assert.assertEquals(MetricType.GAUGE.getId(),
gaugeMinMetricItem.getMetricType().getId());
Assert.assertEquals(MetricValueType.MIN.getId(),
gaugeMinMetricItem.getMetricValueType().getId());
} catch (Exception ex) {
logger.error("error happens" + ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ private RegisterResponseB2C inProcessConsumerRegister(final String clientId, fin
consumerNodeInfo = new ConsumerNodeInfo(storeManager, reqQryPriorityId,
clientId, filterCondSet, reqSessionKey, reqSessionTime, true, partStr);
if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) {
BrokerMetricsHolder.METRICS.consumerOnlineCnt.incrementAndGet();
BrokerMetricsHolder.incConsumerCnt();
}
heartbeatManager.regConsumerNode(getHeartbeatNodeId(clientId, partStr), clientId, partStr);
MessageStore dataStore = null;
Expand Down Expand Up @@ -891,7 +891,7 @@ private RegisterResponseB2C inProcessConsumerRegister(final String clientId, fin
heartbeatManager.getConsumerRegMap().get(getHeartbeatNodeId(consumerId, partStr));
if (timeoutInfo == null || System.currentTimeMillis() >= timeoutInfo.getTimeoutTime()) {
if (consumerRegisterMap.remove(partStr) != null) {
BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet();
BrokerMetricsHolder.decConsumerCnt(true);
}
strBuffer.append("[Duplicated Register] Remove Invalid Consumer Register ")
.append(consumerId).append(TokenConstants.SEGMENT_SEP).append(partStr);
Expand Down Expand Up @@ -957,7 +957,7 @@ private RegisterResponseB2C inProcessConsumerUnregister(final String clientId, f
.append(request.getPartitionId()).append(" updatedOffset:").append(updatedOffset).toString());
strBuffer.delete(0, strBuffer.length());
if (consumerRegisterMap.remove(partStr) != null) {
BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet();
BrokerMetricsHolder.decConsumerCnt(false);
}
heartbeatManager.unRegConsumerNode(
getHeartbeatNodeId(clientId, partStr));
Expand Down Expand Up @@ -1245,8 +1245,7 @@ public void onTimeout(final String nodeId, TimeoutInfo nodeInfo) {
}
if (consumerNodeInfo.getConsumerId().equalsIgnoreCase(nodeInfo.getSecondKey())) {
if (consumerRegisterMap.remove(nodeInfo.getThirdKey()) != null) {
BrokerMetricsHolder.METRICS.consumerOnlineCnt.decrementAndGet();
BrokerMetricsHolder.METRICS.consumerTmoTotCnt.decrementAndGet();
BrokerMetricsHolder.decConsumerCnt(true);
}
String[] groupTopicPart =
consumerNodeInfo.getPartStr().split(TokenConstants.ATTR_SEP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ public void run() {
if (!response.getSuccess()) {
isKeepAlive.set(false);
if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
BrokerMetricsHolder.METRICS
.masterNoNodeCnt.incrementAndGet();
BrokerMetricsHolder.incMasterNoNodeCnt();
register2Master();
heartbeatErrors.set(0);
logger.info("Re-register to master successfully!");
Expand All @@ -234,7 +233,7 @@ public void run() {
isKeepAlive.set(false);
heartbeatErrors.incrementAndGet();
samplePrintCtrl.printExceptionCaught(t);
BrokerMetricsHolder.METRICS.hbExceptionCnt.incrementAndGet();
BrokerMetricsHolder.incHBExceptionCnt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import org.apache.inlong.tubemq.corebase.metric.MetricValues;

/**
* BrokerMonitorMXBean
* Provide access interface of a metric item with JMX.<br>
* Decouple between metric item and monitor system, in particular scene, <br>
* inlong can depend on user-defined monitor system.
* BrokerMetricMXBean
* Broker's metric data access interface, including:
* the getMetric() that directly obtains data
* the getAndReSetMetrics() that can clear the values of
* the counter, maximum and minimum extremum Gauge data
*/
public interface BrokerMetricMXBean {

// get current metric data by viewing mode
MetricValues getMetrics();

// get current metric data and reset the Counter, maximum/minimum Gauge metric
MetricValues getAndReSetMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,33 @@ public class BrokerMetrics implements BrokerMetricMXBean {

private final AtomicLong lastResetTime =
new AtomicLong(System.currentTimeMillis());
public final AbsMetricItem syncDataDurMin =
new GaugeMinMetricItem("fSync_latency_min");
public final AbsMetricItem syncDataDurMax =
new GaugeMaxMetricItem("fSync_latency_max");
public final AbsMetricItem syncZkDurMin =
new GaugeMinMetricItem("zkSync_latency_min");
public final AbsMetricItem syncZkDurMax =
new GaugeMaxMetricItem("zkSync_latency_max");
public final AbsMetricItem zkExceptionCnt =
// Delay statistics for syncing data to files
protected final AbsMetricItem syncDataDurMin =
new GaugeMinMetricItem("fSync_duration_min");
protected final AbsMetricItem syncDataDurMax =
new GaugeMaxMetricItem("fSync_duration_max");
// Delay statistics for syncing data to Zookeeper
protected final AbsMetricItem syncZkDurMin =
new GaugeMinMetricItem("zkSync_duration_min");
protected final AbsMetricItem syncZkDurMax =
new GaugeMaxMetricItem("zkSync_duration_max");
// Zookeeper Exception statistics
protected final AbsMetricItem zkExceptionCnt =
new CountMetricItem("zk_exception_cnt");
public final AbsMetricItem masterNoNodeCnt =
// Broker 2 Master status statistics
protected final AbsMetricItem masterNoNodeCnt =
new CountMetricItem("online_timeout_cnt");
public final AbsMetricItem hbExceptionCnt =
protected final AbsMetricItem hbExceptionCnt =
new CountMetricItem("hb_master_exception_cnt");
public final AbsMetricItem ioExceptionCnt =
// Disk IO Exception statistics
protected final AbsMetricItem ioExceptionCnt =
new CountMetricItem("io_exception_cnt");
public final AbsMetricItem consumerOnlineCnt =
// Consumer client statistics
protected final AbsMetricItem consumerOnlineCnt =
new GaugeNormMetricItem("consumer_online_cnt");
public final AbsMetricItem consumerTmoTotCnt =
protected final AbsMetricItem consumerTmoTotCnt =
new CountMetricItem("consumer_timeout_cnt");

public BrokerMetrics() {
this.lastResetTime.set(System.currentTimeMillis());
}

@Override
public MetricValues getMetrics() {
Map<String, Long> metricValues = new HashMap<>();
Expand Down Expand Up @@ -92,5 +94,49 @@ public MetricValues getAndReSetMetrics() {
return new MetricValues(
WebParameterUtils.date2yyyyMMddHHmmss(new Date(befTime)), metricValues);
}

public long getLastResetTime() {
return lastResetTime.get();
}

public AbsMetricItem getSyncDataDurMin() {
return syncDataDurMin;
}

public AbsMetricItem getSyncDataDurMax() {
return syncDataDurMax;
}

public AbsMetricItem getSyncZkDurMin() {
return syncZkDurMin;
}

public AbsMetricItem getSyncZkDurMax() {
return syncZkDurMax;
}

public AbsMetricItem getZkExceptionCnt() {
return zkExceptionCnt;
}

public AbsMetricItem getMasterNoNodeCnt() {
return masterNoNodeCnt;
}

public AbsMetricItem getHbExceptionCnt() {
return hbExceptionCnt;
}

public AbsMetricItem getIoExceptionCnt() {
return ioExceptionCnt;
}

public AbsMetricItem getConsumerOnlineCnt() {
return consumerOnlineCnt;
}

public AbsMetricItem getConsumerTmoTotCnt() {
return consumerTmoTotCnt;
}
}

Loading

0 comments on commit 1a905c6

Please sign in to comment.